summaryrefslogtreecommitdiff
path: root/indra/llcommon/lleventcoro.cpp
blob: e1fc4764f66a0b58955885fd28d8bf84ea96f8dd (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
/**
 * @file   lleventcoro.cpp
 * @author Nat Goodspeed
 * @date   2009-04-29
 * @brief  Implementation for lleventcoro.
 *
 * $LicenseInfo:firstyear=2009&license=viewerlgpl$
 * Second Life Viewer Source Code
 * Copyright (C) 2010, Linden Research, Inc.
 *
 * This library is free software; you can redistribute it and/or
 * modify it under the terms of the GNU Lesser General Public
 * License as published by the Free Software Foundation;
 * version 2.1 of the License only.
 *
 * This library is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
 * Lesser General Public License for more details.
 *
 * You should have received a copy of the GNU Lesser General Public
 * License along with this library; if not, write to the Free Software
 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301  USA
 *
 * Linden Research, Inc., 945 Battery Street, San Francisco, CA  94111  USA
 * $/LicenseInfo$
 */

// Precompiled header
#include "linden_common.h"
// associated header
#include "lleventcoro.h"
// STL headers
#include <chrono>
#include <exception>
// std headers
// external library headers
#include <boost/fiber/operations.hpp>
// other Linden headers
#include "llsdserialize.h"
#include "llsdutil.h"
#include "llerror.h"
#include "llcoros.h"
#include "stringize.h"

namespace
{

/**
 * suspendUntilEventOn() permits a coroutine to temporarily listen on an
 * LLEventPump any number of times. We don't really want to have to ask
 * the caller to label each such call with a distinct string; the whole
 * point of suspendUntilEventOn() is to present a nice sequential interface to
 * the underlying LLEventPump-with-named-listeners machinery. So we'll use
 * LLEventPump::inventName() to generate a distinct name for each
 * temporary listener. On the other hand, because a given coroutine might
 * call suspendUntilEventOn() any number of times, we don't really want to
 * consume an arbitrary number of generated inventName()s: that namespace,
 * though large, is nonetheless finite. So we memoize an invented name for
 * each distinct coroutine instance.
 */
std::string listenerNameForCoro()
{
    // If this coroutine was launched by LLCoros::launch(), find that name.
    std::string name(LLCoros::getName());
    if (! name.empty())
    {
        return name;
    }
    // this is the first time we've been called for this coroutine instance
    name = LLEventPump::inventName("coro");
    LL_INFOS("LLEventCoro") << "listenerNameForCoro(): inventing coro name '"
                            << name << "'" << LL_ENDL;
    return name;
}

/**
 * Implement behavior described for postAndSuspend()'s @a replyPumpNamePath
 * parameter:
 *
 * * If <tt>path.isUndefined()</tt>, do nothing.
 * * If <tt>path.isString()</tt>, @a dest is an LLSD map: store @a value
 *   into <tt>dest[path.asString()]</tt>.
 * * If <tt>path.isInteger()</tt>, @a dest is an LLSD array: store @a
 *   value into <tt>dest[path.asInteger()]</tt>.
 * * If <tt>path.isArray()</tt>, iteratively apply the rules above to step
 *   down through the structure of @a dest. The last array entry in @a
 *   path specifies the entry in the lowest-level structure in @a dest
 *   into which to store @a value.
 *
 * @note
 * In the degenerate case in which @a path is an empty array, @a dest will
 * @em become @a value rather than @em containing it.
 */
void storeToLLSDPath(LLSD& dest, const LLSD& path, const LLSD& value)
{
    if (path.isUndefined())
    {
        // no-op case
        return;
    }

    // Drill down to where we should store 'value'.
    llsd::drill_ref(dest, path) = value;
}

} // anonymous

void llcoro::suspend()
{
    LLCoros::checkStop();
    LLCoros::TempStatus st("waiting one tick");
    boost::this_fiber::yield();
}

void llcoro::suspendUntilTimeout(float seconds)
{
    LLCoros::checkStop();
    // We used to call boost::this_fiber::sleep_for(). But some coroutines
    // (e.g. LLExperienceCache::idleCoro()) sit in a suspendUntilTimeout()
    // loop, in which case a sleep_for() call risks sleeping through shutdown.
    // So instead, listen for "LLApp" state-changing events -- which
    // fortunately is handled for us by suspendUntilEventOnWithTimeout().
    // Wait for an event on a bogus LLEventPump on which nobody ever posts
    // events. Don't make it static because that would force instantiation of
    // the LLEventPumps LLSingleton registry at static initialization time.
    // DO allow tweaking the name for uniqueness, this definitely gets
    // re-entered on multiple coroutines!
    // We could use an LLUUID if it were important to actively prohibit anyone
    // from ever posting on this LLEventPump.
    LLEventStream bogus("xyzzy", true);
    // Timeout is the NORMAL case for this call!
    static LLSD timedout;
    // Deliver, but ignore, timedout when (as usual) we did not receive any
    // "LLApp" event. The point is that suspendUntilEventOnWithTimeout() will
    // itself throw Stopping when "LLApp" starts broadcasting shutdown events.
    suspendUntilEventOnWithTimeout(bogus, seconds, timedout);
}

namespace
{

// returns a listener on replyPumpP, also on "mainloop" -- both should be
// stored in LLTempBoundListeners on the caller's stack frame
std::pair<LLBoundListener, LLBoundListener>
postAndSuspendSetup(const std::string& callerName,
                    const std::string& listenerName,
                    LLCoros::Promise<LLSD>& promise,
                    const LLSD& event,
                    const LLEventPumpOrPumpName& requestPumpP,
                    const LLEventPumpOrPumpName& replyPumpP,
                    const LLSD& replyPumpNamePath)
{
    // Before we get any farther -- should we be stopping instead of
    // suspending?
    LLCoros::checkStop();
    // Get the consuming attribute for THIS coroutine, the one that's about to
    // suspend. Don't call get_consuming() in the lambda body: that would
    // return the consuming attribute for some other coroutine, most likely
    // the main routine.
    bool consuming(LLCoros::get_consuming());
    // listen on the specified LLEventPump with a lambda that will assign a
    // value to the promise, thus fulfilling its future
    llassert_always_msg(replyPumpP, ("replyPump required for " + callerName));
    LLEventPump& replyPump(replyPumpP.getPump());
    // The relative order of the two listen() calls below would only matter if
    // "LLApp" were an LLEventMailDrop. But if we ever go there, we'd want to
    // notice the pending LLApp status first.
    LLBoundListener stopper(
        LLEventPumps::instance().obtain("LLApp").listen(
            listenerName,
            [&promise, listenerName](const LLSD& status)
            {
                // anything except "running" should wake up the waiting
                // coroutine
                auto& statsd = status["status"];
                if (statsd.asString() != "running")
                {
                    LL_DEBUGS("lleventcoro") << listenerName
                                             << " spotted status " << statsd
                                             << ", throwing Stopping" << LL_ENDL;
                    try
                    {
                        promise.set_exception(
                            std::make_exception_ptr(
                                LLCoros::Stopping("status " + statsd.asString())));
                    }
                    catch (const boost::fibers::promise_already_satisfied&)
                    {
                        LL_WARNS("lleventcoro") << listenerName
                                                << " couldn't throw Stopping "
                                                   "because promise already set" << LL_ENDL;
                    }
                }
                // do not consume -- every listener must see status
                return false;
            }));
    LLBoundListener connection(
        replyPump.listen(
            listenerName,
            [&promise, consuming, listenerName](const LLSD& result)
            {
                try
                {
                    promise.set_value(result);
                    // We did manage to propagate the result value to the
                    // (real) listener. If we're supposed to indicate that
                    // we've consumed it, do so.
                    return consuming;
                }
                catch(boost::fibers::promise_already_satisfied & ex)
                {
                    LL_DEBUGS("lleventcoro") << "promise already satisfied in '"
                        << listenerName << "': "  << ex.what() << LL_ENDL;
                    // We could not propagate the result value to the
                    // listener.
                    return false;
                }
            }));

    // skip the "post" part if requestPump is default-constructed
    if (requestPumpP)
    {
        LLEventPump& requestPump(requestPumpP.getPump());
        // If replyPumpNamePath is non-empty, store the replyPump name in the
        // request event.
        LLSD modevent(event);
        storeToLLSDPath(modevent, replyPumpNamePath, replyPump.getName());
        LL_DEBUGS("lleventcoro") << callerName << ": coroutine " << listenerName
                                 << " posting to " << requestPump.getName()
                                 << LL_ENDL;

        // *NOTE:Mani - Removed because modevent could contain user's hashed passwd.
        //                         << ": " << modevent << LL_ENDL;
        requestPump.post(modevent);
    }
    LL_DEBUGS("lleventcoro") << callerName << ": coroutine " << listenerName
                             << " about to wait on LLEventPump " << replyPump.getName()
                             << LL_ENDL;
    return { connection, stopper };
}

} // anonymous

LLSD llcoro::postAndSuspend(const LLSD& event, const LLEventPumpOrPumpName& requestPump,
                 const LLEventPumpOrPumpName& replyPump, const LLSD& replyPumpNamePath)
{
    LLCoros::Promise<LLSD> promise;
    std::string listenerName(listenerNameForCoro());

    // Store both connections into LLTempBoundListeners so we implicitly
    // disconnect on return from this function.
    auto connections =
        postAndSuspendSetup("postAndSuspend()", listenerName, promise,
                            event, requestPump, replyPump, replyPumpNamePath);
    LLTempBoundListener connection(connections.first), stopper(connections.second);

    // declare the future
    LLCoros::Future<LLSD> future = LLCoros::getFuture(promise);
    // calling get() on the future makes us wait for it
    LLCoros::TempStatus st(STRINGIZE("waiting for " << replyPump.getPump().getName()));
    LLSD value(future.get());
    LL_DEBUGS("lleventcoro") << "postAndSuspend(): coroutine " << listenerName
                             << " resuming with " << value << LL_ENDL;
    // returning should disconnect the connection
    return value;
}

LLSD llcoro::postAndSuspendWithTimeout(const LLSD& event,
                                       const LLEventPumpOrPumpName& requestPump,
                                       const LLEventPumpOrPumpName& replyPump,
                                       const LLSD& replyPumpNamePath,
                                       F32 timeout, const LLSD& timeoutResult)
{
    LLCoros::Promise<LLSD> promise;
    std::string listenerName(listenerNameForCoro());

    // Store both connections into LLTempBoundListeners so we implicitly
    // disconnect on return from this function.
    auto connections =
        postAndSuspendSetup("postAndSuspendWithTimeout()", listenerName, promise,
                            event, requestPump, replyPump, replyPumpNamePath);
    LLTempBoundListener connection(connections.first), stopper(connections.second);

    // declare the future
    LLCoros::Future<LLSD> future = LLCoros::getFuture(promise);
    // wait for specified timeout
    boost::fibers::future_status status;
    {
        LLCoros::TempStatus st(STRINGIZE("waiting for " << replyPump.getPump().getName()
                                         << " for " << timeout << "s"));
        // The fact that we accept non-integer seconds means we should probably
        // use granularity finer than one second. However, given the overhead of
        // the rest of our processing, it seems silly to use granularity finer
        // than a millisecond.
        status = future.wait_for(std::chrono::milliseconds(long(timeout * 1000)));
    }
    // if the future is NOT yet ready, return timeoutResult instead
    if (status == boost::fibers::future_status::timeout)
    {
        LL_DEBUGS("lleventcoro") << "postAndSuspendWithTimeout(): coroutine " << listenerName
                                 << " timed out after " << timeout << " seconds,"
                                 << " resuming with " << timeoutResult << LL_ENDL;
        return timeoutResult;
    }
    else
    {
        llassert_always(status == boost::fibers::future_status::ready);

        // future is now ready, no more waiting
        LLSD value(future.get());
        LL_DEBUGS("lleventcoro") << "postAndSuspendWithTimeout(): coroutine " << listenerName
                                 << " resuming with " << value << LL_ENDL;
        // returning should disconnect the connection
        return value;
    }
}