diff options
Diffstat (limited to 'indra/newview/scripts/lua/leap.lua')
-rw-r--r-- | indra/newview/scripts/lua/leap.lua | 159 |
1 files changed, 127 insertions, 32 deletions
diff --git a/indra/newview/scripts/lua/leap.lua b/indra/newview/scripts/lua/leap.lua index cfb7377523..8caae24e94 100644 --- a/indra/newview/scripts/lua/leap.lua +++ b/indra/newview/scripts/lua/leap.lua @@ -40,6 +40,7 @@ local fiber = require('fiber') local ErrorQueue = require('ErrorQueue') +local inspect = require('inspect') local function dbg(...) end -- local dbg = require('printf') @@ -74,7 +75,7 @@ local reply, command = LL.get_event_pumps() -- pending is NOT a weak table because the caller of request() or generate() -- never sees the WaitForReqid object. pending holds the only reference, so -- it should NOT be garbage-collected. -pending = {} +local pending = {} -- Our consumer will instantiate some number of WaitFor subclass objects. -- As these are traversed in descending priority order, we must keep -- them in a list. @@ -82,7 +83,7 @@ pending = {} -- to it. Once the consuming script drops the reference, allow Lua to -- garbage-collect the WaitFor despite its entry in waitfors. local weak_values = {__mode='v'} -waitfors = setmetatable({}, weak_values) +local waitfors = setmetatable({}, weak_values) -- It has been suggested that we should use UUIDs as ["reqid"] values, -- since UUIDs are guaranteed unique. However, as the "namespace" for -- ["reqid"] values is our very own reply pump, we can get away with @@ -131,7 +132,7 @@ local function requestSetup(pump, data) local waitfor = leap.WaitForReqid:new(reqid) pending[reqid] = waitfor -- Pass reqid to send() to stamp it into (a copy of) the request data. - dbg('requestSetup(%s, %s)', pump, data) + dbg('requestSetup(%s, %s) storing %s', pump, data, waitfor.name) leap.send(pump, data, reqid) return reqid, waitfor end @@ -171,44 +172,127 @@ function leap.request(pump, data) end -- Send the specified request LLSD, expecting an arbitrary number of replies. --- Each one is yielded on receipt. If you omit checklast, this is an infinite --- generator; it's up to the caller to recognize when the last reply has been --- received, and stop resuming for more. --- --- If you pass checklast=<callable accepting(event)>, each response event is --- passed to that callable (after the yield). When the callable returns --- True, the generator terminates in the usual way. +-- Each one is returned on request. +-- +-- Usage: +-- sequence = leap.generate(pump, data) +-- repeat +-- response = sequence.next() +-- until last(response) +-- (last() means whatever test the caller wants to perform on response) +-- sequence.done() -- -- See request() remarks about ["reqid"]. +-- +-- Note: this seems like a prime use case for Lua coroutines. But in a script +-- using fibers.lua, a "wild" coroutine confuses the fiber scheduler. If +-- generate() were itself a coroutine, it would call WaitForReqid:wait(), +-- which would yield -- thereby resuming generate() WITHOUT waiting. function leap.generate(pump, data, checklast) -- Invent a new, unique reqid. Arrange to handle incoming events -- bearing that reqid. Stamp the outbound request with that reqid, and -- send it. local reqid, waitfor = requestSetup(pump, data) - local ok, response, resumed_with - repeat - ok, response = pcall(waitfor.wait, waitfor) - if (not ok) or response.error then - break + return { + next = function() + dbg('leap.generate(%s).next() about to wait on %s', reqid, tostring(waitfor)) + local ok, response = pcall(waitfor.wait, waitfor) + dbg('leap.generate(%s).next() got %s: %s', reqid, ok, response) + if not ok then + error(response) + elseif response.error then + error(response.error) + else + return response + end + end, + done = function() + -- cleanup consists of removing our WaitForReqid from pending + pending[reqid] = nil end - -- can resume(false) to terminate generate() and clean up - resumed_with = coroutine.yield(response) - until (checklast and checklast(response)) or (resumed_with == false) - -- If we break the above loop, whether or not due to error, clean up. - pending[reqid] = nil - if not ok then - error(response) - elseif response.error then + } +end + +-- Send the specified request LLSD, expecting an immediate reply followed by +-- an arbitrary number of subsequent replies with the same reqid. Block the +-- calling coroutine until the first (immediate) reply, but launch a separate +-- fiber on which to call the passed callback with later replies. +-- +-- Once the callback returns true, the background fiber terminates. +function leap.eventstream(pump, data, callback) + local reqid, waitfor = requestSetup(pump, data) + local response = waitfor:wait() + if response.error then + -- clean up our WaitForReqid + waitfor:close() error(response.error) end + -- No error, so far so good: + -- call the callback with the first response just in case + dbg('leap.eventstream(%s): first callback', reqid) + local ok, done = pcall(callback, response) + dbg('leap.eventstream(%s) got %s, %s', reqid, ok, done) + if not ok then + -- clean up our WaitForReqid + waitfor:close() + error(done) + end + if done then + return response + end + -- callback didn't throw an error, and didn't say stop, + -- so set up to handle subsequent events + -- TODO: distinguish "daemon" fibers that can be terminated even if waiting + fiber.launch( + pump, + function () + local ok, done + local nth = 1 + repeat + event = waitfor:wait() + if not event then + -- wait() returns nil once the queue is closed (e.g. cancelreq()) + ok, done = true, true + else + nth += 1 + dbg('leap.eventstream(%s): callback %d', reqid, nth) + ok, done = pcall(callback, event) + dbg('leap.eventstream(%s) got %s, %s', reqid, ok, done) + end + -- not ok means callback threw an error (caught as 'done') + -- done means callback succeeded but wants to stop + until (not ok) or done + -- once we break this loop, clean up our WaitForReqid + waitfor:close() + if not ok then + -- can't reflect the error back to our caller + LL.print_warning(fiber.get_name() .. ': ' .. done) + end + end) + return response +end + +-- we might want to clean up after leap.eventstream() even if the callback has +-- not yet returned true +function leap.cancelreq(reqid) + dbg('cancelreq(%s)', reqid) + local waitfor = pending[reqid] + if waitfor ~= nil then + -- close() removes the pending entry and also closes the queue, + -- breaking the background fiber's wait loop. + dbg('cancelreq(%s) canceling %s', reqid, waitfor.name) + waitfor:close() + end end local function cleanup(message) - -- we're done: clean up all pending coroutines - for i, waitfor in pairs(pending) do + -- We're done: clean up all pending coroutines. + -- Iterate over copies of the pending and waitfors tables, since the + -- close() operation modifies the real tables. + for i, waitfor in pairs(table.clone(pending)) do waitfor:close() end - for i, waitfor in pairs(waitfors) do + for i, waitfor in pairs(table.clone(waitfors)) do waitfor:close() end end @@ -223,7 +307,8 @@ local function unsolicited(pump, data) return end end - LL.print_debug(string.format('unsolicited(%s, %s) discarding unclaimed event', pump, data)) + LL.print_debug(string.format('unsolicited(%s, %s) discarding unclaimed event', + pump, inspect(data))) end -- Route incoming (pump, data) event to the appropriate waiting coroutine. @@ -231,14 +316,17 @@ local function dispatch(pump, data) local reqid = data['reqid'] -- if the response has no 'reqid', it's not from request() or generate() if reqid == nil then +-- dbg('dispatch() found no reqid; calling unsolicited(%s, %s)', pump, data) return unsolicited(pump, data) end -- have reqid; do we have a WaitForReqid? local waitfor = pending[reqid] if waitfor == nil then +-- dbg('dispatch() found no WaitForReqid(%s); calling unsolicited(%s, %s)', reqid, pump, data) return unsolicited(pump, data) end -- found the right WaitForReqid object, let it handle the event +-- dbg('dispatch() calling %s.handle(%s, %s)', waitfor.name, pump, data) waitfor:handle(pump, data) end @@ -284,11 +372,9 @@ end -- called by WaitFor.disable() local function unregisterWaitFor(waitfor) - for i, w in pairs(waitfors) do - if w == waitfor then - waitfors[i] = nil - break - end + local i = table.find(waitfors, waitfor) + if i ~= nil then + waitfors[i] = nil end end @@ -417,6 +503,7 @@ end -- called by cleanup() at end function leap.WaitFor:close() + self:disable() self._queue:close() end @@ -437,6 +524,8 @@ function leap.WaitForReqid:new(reqid) setmetatable(obj, self) self.__index = self + obj.reqid = reqid + return obj end @@ -447,4 +536,10 @@ function leap.WaitForReqid:filter(pump, data) return data end +function leap.WaitForReqid:close() + -- remove this entry from pending table + pending[self.reqid] = nil + self._queue:close() +end + return leap |