1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
|
-- Lua implementation of LEAP (LLSD Event API Plugin) protocol
--
-- This module supports Lua scripts run by the Second Life viewer.
--
-- LEAP protocol passes LLSD objects, converted to/from Lua tables, in both
-- directions. A typical LLSD object is a map containing keys 'pump' and
-- 'data'.
--
-- The viewer's Lua post_to(pump, data) function posts 'data' to the
-- LLEventPump 'pump'. This is typically used to engage an LLEventAPI method.
--
-- Similarly, the viewer gives each Lua script its own LLEventPump with a
-- unique name. That name is returned by get_event_pumps(). Every event
-- received on that LLEventPump is queued for retrieval by get_event_next(),
-- which returns (pump, data): the name of the LLEventPump on which the event
-- was received and the received event data. When the queue is empty,
-- get_event_next() blocks the calling Lua script until the next event is
-- received.
--
-- Usage:
-- 1. Launch some number of Lua coroutines. The code in each coroutine may
-- call leap.send(), leap.request() or leap.generate(). leap.send() returns
-- immediately ("fire and forget"). leap.request() blocks the calling
-- coroutine until it receives and returns the viewer's response to its
-- request. leap.generate() expects an arbitrary number of responses to the
-- original request.
-- 2. To handle events from the viewer other than direct responses to
-- requests, instantiate a leap.WaitFor object with a filter(pump, data)
-- override method that returns non-nil for desired events. A coroutine may
-- call wait() on any such WaitFor.
-- 3. Once the coroutines have been launched, call leap.process() on the main
-- coroutine. process() retrieves incoming events from the viewer and
-- dispatches them to waiting request() or generate() calls, or to
-- appropriate WaitFor instances. process() returns when either
-- get_event_next() raises an error or the viewer posts nil to the script's
-- reply pump to indicate it's done.
-- 4. Alternatively, a running coroutine may call leap.done() to break out of
-- leap.process(). process() won't notice until the next event from the
-- viewer, though.
local fiber = require('fiber')
local ErrorQueue = require('ErrorQueue')
-- local debug = require('printf')
local function debug(...) end
local leap = {}
-- _reply: string name of reply LLEventPump. Any events the viewer posts to
-- this pump will be queued for get_event_next(). We usually specify it as the
-- reply pump for requests to internal viewer services.
-- _command: string name of command LLEventPump. post_to(_command, ...)
-- engages LLLeapListener operations such as listening on a specified other
-- LLEventPump, etc.
leap._reply, leap._command = LL.get_event_pumps()
-- Dict of features added to the LEAP protocol since baseline implementation.
-- Before engaging a new feature that might break an older viewer, we can
-- check for the presence of that feature key. This table is solely about the
-- LEAP protocol itself, the way we communicate with the viewer. To discover
-- whether a given listener exists, or supports a particular operation, use
-- _command's "getAPI" operation.
-- For Lua, _command's "getFeatures" operation suffices?
-- leap._features = {}
-- Each outstanding request() or generate() call has a corresponding
-- WaitForReqid object (later in this module) to handle the
-- response(s). If an incoming event contains an echoed ["reqid"] key,
-- we can look up the appropriate WaitForReqid object more efficiently
-- in a dict than by tossing such objects into the usual waitfors list.
-- Note: the ["reqid"] must be unique, otherwise we could end up
-- replacing an earlier WaitForReqid object in self.pending with a
-- later one. That means that no incoming event will ever be given to
-- the old WaitForReqid object. Any coroutine waiting on the discarded
-- WaitForReqid object would therefore wait forever.
-- these are weak values tables
local weak_values = {__mode='v'}
leap._pending = setmetatable({}, weak_values)
-- Our consumer will instantiate some number of WaitFor subclass objects.
-- As these are traversed in descending priority order, we must keep
-- them in a list.
leap._waitfors = setmetatable({}, weak_values)
-- It has been suggested that we should use UUIDs as ["reqid"] values,
-- since UUIDs are guaranteed unique. However, as the "namespace" for
-- ["reqid"] values is our very own _reply pump, we can get away with
-- an integer.
leap._reqid = 0
-- break leap.process() loop
leap._done = false
-- get the name of the reply pump
function leap.replypump()
return leap._reply
end
-- get the name of the command pump
function leap.cmdpump()
return leap._command
end
-- Fire and forget. Send the specified request LLSD, expecting no reply.
-- In fact, should the request produce an eventual reply, it will be
-- treated as an unsolicited event.
--
-- See also request(), generate().
function leap.send(pump, data, reqid)
debug('leap.send(%s, %s, %s) entry', pump, data, reqid)
local data = data
if type(data) == 'table' then
data = table.clone(data)
data['reply'] = leap._reply
if reqid ~= nil then
data['reqid'] = reqid
end
end
debug('leap.send(%s, %s) calling post_on()', pump, data)
LL.post_on(pump, data)
end
-- common setup code shared by request() and generate()
local function requestSetup(pump, data)
-- invent a new, unique reqid
leap._reqid += 1
local reqid = leap._reqid
-- Instantiate a new WaitForReqid object. The priority is irrelevant
-- because, unlike the WaitFor base class, WaitForReqid does not
-- self-register on our leap._waitfors list. Instead, capture the new
-- WaitForReqid object in leap._pending so dispatch() can find it.
leap._pending[reqid] = leap.WaitForReqid:new(reqid)
-- Pass reqid to send() to stamp it into (a copy of) the request data.
debug('requestSetup(%s, %s)', pump, data)
leap.send(pump, data, reqid)
return reqid
end
-- Send the specified request LLSD, expecting exactly one reply. Block
-- the calling coroutine until we receive that reply.
--
-- Every request() (or generate()) LLSD block we send will get stamped
-- with a distinct ["reqid"] value. The requested event API must echo the
-- same ["reqid"] field in each reply associated with that request. This way
-- we can correctly dispatch interleaved replies from different requests.
--
-- If the desired event API doesn't support the ["reqid"] echo convention,
-- you should use send() instead -- since request() or generate() would
-- wait forever for a reply stamped with that ["reqid"] -- and intercept
-- any replies using WaitFor.
--
-- Unless the request data already contains a ["reply"] key, we insert
-- reply=self.replypump to try to ensure that the expected reply will be
-- returned over the socket.
--
-- See also send(), generate().
function leap.request(pump, data)
local reqid = requestSetup(pump, data)
local waitfor = leap._pending[reqid]
debug('leap.request(%s, %s) about to wait on %s', pump, data, tostring(waitfor))
local ok, response = pcall(waitfor.wait, waitfor)
debug('leap.request(%s, %s) got %s: %s', pump, data, ok, response)
-- kill off temporary WaitForReqid object, even if error
leap._pending[reqid] = nil
if ok then
return response
else
error(response)
end
end
-- Send the specified request LLSD, expecting an arbitrary number of replies.
-- Each one is yielded on receipt. If you omit checklast, this is an infinite
-- generator; it's up to the caller to recognize when the last reply has been
-- received, and stop resuming for more.
--
-- If you pass checklast=<callable accepting(event)>, each response event is
-- passed to that callable (after the yield). When the callable returns
-- True, the generator terminates in the usual way.
--
-- See request() remarks about ["reqid"].
function leap.generate(pump, data, checklast)
-- Invent a new, unique reqid. Arrange to handle incoming events
-- bearing that reqid. Stamp the outbound request with that reqid, and
-- send it.
local reqid = requestSetup(pump, data)
local waitfor = leap._pending[reqid]
local ok, response
repeat
ok, response = pcall(waitfor.wait, waitfor)
if not ok then
break
end
coroutine.yield(response)
until checklast and checklast(response)
-- If we break the above loop, whether or not due to error, clean up.
leap._pending[reqid] = nil
if not ok then
error(response)
end
end
local function cleanup(message)
-- we're done: clean up all pending coroutines
for i, waitfor in pairs(leap._pending) do
waitfor:close()
end
for i, waitfor in pairs(leap._waitfors) do
waitfor:close()
end
end
-- Handle an incoming (pump, data) event with no recognizable ['reqid']
local function unsolicited(pump, data)
-- we maintain waitfors in descending priority order, so the first waitfor
-- to claim this event is the one with the highest priority
for i, waitfor in pairs(leap._waitfors) do
debug('unsolicited() checking %s', waitfor.name)
if waitfor:handle(pump, data) then
return
end
end
LL.print_debug(string.format('unsolicited(%s, %s) discarding unclaimed event', pump, data))
end
-- Route incoming (pump, data) event to the appropriate waiting coroutine.
local function dispatch(pump, data)
local reqid = data['reqid']
-- if the response has no 'reqid', it's not from request() or generate()
if reqid == nil then
return unsolicited(pump, data)
end
-- have reqid; do we have a WaitForReqid?
local waitfor = leap._pending[reqid]
if waitfor == nil then
return unsolicited(pump, data)
end
-- found the right WaitForReqid object, let it handle the event
waitfor:handle(pump, data)
end
-- We configure fiber.set_idle() function. fiber.yield() calls the configured
-- idle callback whenever there are waiting fibers but no ready fibers. In
-- our case, that means it's time to fetch another incoming viewer event.
fiber.set_idle(function ()
-- If someone has called leap.done(), then tell fiber.yield() to break loop.
if leap._done then
cleanup('done')
return 'done'
end
debug('leap.idle() calling get_event_next()')
local ok, pump, data = pcall(LL.get_event_next)
debug('leap.idle() got %s: %s, %s', ok, pump, data)
-- ok false means get_event_next() raised a Lua error, pump is message
if not ok then
cleanup(pump)
error(pump)
end
-- data nil means get_event_next() returned (pump, LLSD()) to indicate done
if not data then
cleanup('end')
return 'end'
end
-- got a real pump, data pair
dispatch(pump, data)
-- return to fiber.yield(): any incoming message might result in one or
-- more fibers becoming ready
end)
function leap.done()
leap._done = true
end
-- called by WaitFor.enable()
local function registerWaitFor(waitfor)
table.insert(leap._waitfors, waitfor)
-- keep waitfors sorted in descending order of specified priority
table.sort(leap._waitfors,
function (lhs, rhs) return lhs.priority > rhs.priority end)
end
-- called by WaitFor.disable()
local function unregisterWaitFor(waitfor)
for i, w in pairs(leap._waitfors) do
if w == waitfor then
leap._waitfors[i] = nil
break
end
end
end
-- ******************************************************************************
-- WaitFor and friends
-- ******************************************************************************
-- An unsolicited event is handled by the highest-priority WaitFor subclass
-- object willing to accept it. If no such object is found, the unsolicited
-- event is discarded.
--
-- * First, instantiate a WaitFor subclass object to register its interest in
-- some incoming event(s). WaitFor instances are self-registering; merely
-- instantiating the object suffices.
-- * Any coroutine may call a given WaitFor object's wait() method. This blocks
-- the calling coroutine until a suitable event arrives.
-- * WaitFor's constructor accepts a float priority. Every incoming event
-- (other than those claimed by request() or generate()) is passed to each
-- extant WaitFor.filter() method in descending priority order. The first
-- such filter() to return nontrivial data claims that event.
-- * At that point, the blocked wait() call on that WaitFor object returns the
-- item returned by filter().
-- * WaitFor contains a queue. Multiple arriving events claimed by that WaitFor
-- object's filter() method are added to the queue. Naturally, until the
-- queue is empty, calling wait() immediately returns the front entry.
--
-- It's reasonable to instantiate a WaitFor subclass whose filter() method
-- unconditionally returns the incoming event, and whose priority places it
-- last in the list. This object will enqueue every unsolicited event left
-- unclaimed by other WaitFor subclass objects.
--
-- It's not strictly necessary to associate a WaitFor object with exactly one
-- coroutine. You might have multiple "worker" coroutines drawing from the same
-- WaitFor object, useful if the work being done per event might itself involve
-- "blocking" operations. Or a given coroutine might sample a number of WaitFor
-- objects in round-robin fashion... etc. etc. Nonetheless, it's
-- straightforward to designate one coroutine for each WaitFor object.
-- --------------------------------- WaitFor ---------------------------------
leap.WaitFor = { _id=0 }
function leap.WaitFor.tostring(self)
-- Lua (sub)classes have no name; can't prefix with that
return self.name
end
function leap.WaitFor:new(priority, name)
local obj = setmetatable({__tostring=leap.WaitFor.tostring}, self)
self.__index = self
obj.priority = priority
if name then
obj.name = name
else
self._id += 1
obj.name = 'WaitFor' .. self._id
end
obj._queue = ErrorQueue:new()
obj._registered = false
-- if no priority, then don't enable() - remember 0 is truthy
if priority then
obj:enable()
end
return obj
end
-- Re-enable a disable()d WaitFor object. New WaitFor objects are
-- enable()d by default.
function leap.WaitFor:enable()
if not self._registered then
registerWaitFor(self)
self._registered = true
end
end
-- Disable an enable()d WaitFor object.
function leap.WaitFor:disable()
if self._registered then
unregisterWaitFor(self)
self._registered = false
end
end
-- Block the calling coroutine until a suitable unsolicited event (one
-- for which filter() returns the event) arrives.
function leap.WaitFor:wait()
debug('%s about to wait', self.name)
local item = self._queue:Dequeue()
debug('%s got %s', self.name, item)
return item
end
-- Override filter() to examine the incoming event in whatever way
-- makes sense.
--
-- Return nil to ignore this event.
--
-- To claim the event, return the item you want placed in the queue.
-- Typically you'd write:
-- return data
-- or perhaps
-- return {pump=pump, data=data}
-- or some variation.
function leap.WaitFor:filter(pump, data)
error('You must override the WaitFor.filter() method')
end
-- called by unsolicited() for each WaitFor in leap._waitfors
function leap.WaitFor:handle(pump, data)
local item = self:filter(pump, data)
debug('%s.filter() returned %s', self.name, item)
-- if this item doesn't pass the filter, we're not interested
if not item then
return false
end
-- okay, filter() claims this event
self:process(item)
return true
end
-- called by WaitFor:handle() for an accepted event
function leap.WaitFor:process(item)
self._queue:Enqueue(item)
end
-- called by cleanup() at end
function leap.WaitFor:close()
self._queue:close()
end
-- called by leap.process() when get_event_next() raises an error
function leap.WaitFor:exception(message)
LL.print_warning(self.name .. ' error: ' .. message)
self._queue:Error(message)
end
-- ------------------------------ WaitForReqid -------------------------------
leap.WaitForReqid = leap.WaitFor:new()
function leap.WaitForReqid:new(reqid)
-- priority is meaningless, since this object won't be added to the
-- priority-sorted ViewerClient.waitfors list. Use the reqid as the
-- debugging name string.
local obj = leap.WaitFor:new(nil, 'WaitForReqid(' .. reqid .. ')')
setmetatable(obj, self)
self.__index = self
return obj
end
function leap.WaitForReqid:filter(pump, data)
-- Because we expect to directly look up the WaitForReqid object of
-- interest based on the incoming ["reqid"] value, it's not necessary
-- to test the event again. Accept every such event.
return data
end
return leap
|