From 0566af988790e95414ed18cd82206710094d8fae Mon Sep 17 00:00:00 2001 From: Nat Goodspeed Date: Thu, 21 Mar 2024 23:56:46 +0900 Subject: WIP: Add fiber.lua module and use in leap.lua and WaitQueue.lua. fiber.lua goes beyond coro.lua in that it distinguishes ready suspended coroutines from waiting suspended coroutines, and presents a rudimentary scheduler in fiber.yield(). yield() can determine that when all coroutines are waiting, it's time to retrieve the next incoming event from the viewer. Moreover, it can detect when all coroutines have completed and exit without being explicitly told. fiber.launch() associates a name with each fiber for debugging purposes. fiber.get_name() retrieves the name of the specified fiber, or the running fiber. fiber.status() is like coroutine.status(), but can return 'ready' or 'waiting' instead of 'suspended'. fiber.yield() leaves the calling fiber ready, but lets other ready fibers run. fiber.wait() suspends the calling fiber and lets other ready fibers run. fiber.wake(), called from some other coroutine, returns the passed fiber to ready status for a future call to fiber.yield(). fiber.run() drives the scheduler to run all fibers to completion. If, on completion of the subject Lua script, LuaState::expr() detects that the script loaded fiber.lua, it calls fiber.run() to finish running any dangling fibers. This lets a script make calls to fiber.launch() and then just fall off the end, leaving the implicit fiber.run() call to run them all. fiber.lua is designed to allow the main thread, as well as explicitly launched coroutines, to make leap.request() calls. This part still needs debugging. The leap.lua module now configures a fiber.set_idle() function that honors leap.done(), but calls get_event_next() and dispatches the next incoming event. leap.request() and generate() now leave the reqid stamp in the response. This lets a caller handle subsequent events with the same reqid, e.g. for LLLuaFloater. Remove leap.process(): it has been superseded by fiber.run(). Remove leap.WaitFor:iterate(): unfortunately that would run afoul of the Luau bug that prevents suspending the calling coroutine within a generic 'for' iterator function. Make leap.lua use weak tables to track WaitFor objects. Make WaitQueue:Dequeue() call fiber.wait() to suspend its caller when the queue is empty, and Enqueue() call fiber.wake() to set it ready again when a new item is pushed. Make llluamanager_test.cpp's leap test script use the fiber module to launch coroutines, instead of the coro module. Fix a bug in which its drain() function was inadvertently setting and testing the global 'item' variable instead of one local to the function. Since some other modules had the same bug, it was getting confused. Also add printf.lua, providing a printf() function. printf() is short for print(string.format()), but it can also print tables: anything not a number or string is formatted using the inspect() function. Clean up some LL_DEBUGS() output left over from debugging lua_tollsd(). --- indra/newview/scripts/lua/WaitQueue.lua | 29 ++- indra/newview/scripts/lua/fiber.lua | 301 ++++++++++++++++++++++++++++++++ indra/newview/scripts/lua/leap.lua | 195 ++++++++++----------- indra/newview/scripts/lua/printf.lua | 19 ++ 4 files changed, 428 insertions(+), 116 deletions(-) create mode 100644 indra/newview/scripts/lua/fiber.lua create mode 100644 indra/newview/scripts/lua/printf.lua (limited to 'indra/newview/scripts/lua') diff --git a/indra/newview/scripts/lua/WaitQueue.lua b/indra/newview/scripts/lua/WaitQueue.lua index 00766ccae7..b15e9c443b 100644 --- a/indra/newview/scripts/lua/WaitQueue.lua +++ b/indra/newview/scripts/lua/WaitQueue.lua @@ -2,8 +2,12 @@ -- the Dequeue() operation blocks the calling coroutine until some other -- coroutine Enqueue()s a new value. +local fiber = require('fiber') local Queue = require('Queue') +-- local debug = print_debug +local function debug(...) end + local WaitQueue = Queue:new() function WaitQueue:new() @@ -32,11 +36,9 @@ function WaitQueue:_wake_waiters() -- cases. With multiple consumers, if more than one is trying to -- Dequeue() from an empty WaitQueue, we'll have multiple waiters. -- Unlike OS threads, with cooperative concurrency it doesn't make sense - -- to "notify all": we need resume only one of the waiting Dequeue() - -- callers. But since resuming that caller might entail either Enqueue() - -- or Dequeue() calls, recheck every time around to see if we must resume - -- another waiting coroutine. - while not self:IsEmpty() and #self._waiters > 0 do + -- to "notify all": we need wake only one of the waiting Dequeue() + -- callers. + if not self:IsEmpty() and next(self._waiters) then -- Pop the oldest waiting coroutine instead of the most recent, for -- more-or-less round robin fairness. But skip any coroutines that -- have gone dead in the meantime. @@ -47,11 +49,7 @@ function WaitQueue:_wake_waiters() -- do we still have at least one waiting coroutine? if waiter then -- don't pass the head item: let the resumed coroutine retrieve it - local ok, message = coroutine.resume(waiter) - -- if resuming that waiter encountered an error, don't swallow it - if not ok then - error(message) - end + fiber.wake(waiter) end end end @@ -62,18 +60,17 @@ function WaitQueue:Dequeue() -- the queue while there are still items left, and we want the -- consumer(s) to retrieve those last few items. if self._closed then + debug('WaitQueue:Dequeue(): closed') return nil end - local coro = coroutine.running() - if coro == nil then - error("WaitQueue:Dequeue() trying to suspend main coroutine") - end + debug('WaitQueue:Dequeue(): waiting') -- add the running coroutine to the list of waiters - table.insert(self._waiters, coro) + table.insert(self._waiters, fiber.running()) -- then let somebody else run - coroutine.yield() + fiber.wait() end -- here we're sure this queue isn't empty + debug('WaitQueue:Dequeue() calling Queue.Dequeue()') return Queue.Dequeue(self) end diff --git a/indra/newview/scripts/lua/fiber.lua b/indra/newview/scripts/lua/fiber.lua new file mode 100644 index 0000000000..f18d133cc8 --- /dev/null +++ b/indra/newview/scripts/lua/fiber.lua @@ -0,0 +1,301 @@ +-- Organize Lua coroutines into fibers. + +-- In this usage, the difference between coroutines and fibers is that fibers +-- have a scheduler. Yielding a fiber means allowing other fibers, plural, to +-- run: it's more than just returning control to the specific Lua thread that +-- resumed the running coroutine. + +-- fiber.launch() creates a new fiber ready to run. +-- fiber.status() reports (augmented) status of the passed fiber: instead of +-- 'suspended', it returns either 'ready' or 'waiting' +-- fiber.yield() allows other fibers to run, but leaves the calling fiber +-- ready to run. +-- fiber.wait() marks the running fiber not ready, and resumes other fibers. +-- fiber.wake() marks the designated suspended fiber ready to run, but does +-- not yet resume it. +-- fiber.run() runs all current fibers until all have terminated (successfully +-- or with an error). + +local printf = require 'printf' +-- local debug = printf +local function debug(...) end +local coro = require 'coro' + +local fiber = {} + +-- The tables in which we track fibers must have weak keys so dead fibers +-- can be garbage-collected. +local weak_values = {__mode='v'} +local weak_keys = {__mode='k'} + +-- Track each current fiber as being either ready to run or not ready +-- (waiting). wait() moves the running fiber from ready to waiting; wake() +-- moves the designated fiber from waiting back to ready. +-- The ready table is used as a list so yield() can go round robin. +local ready = setmetatable({'main'}, weak_keys) +-- The waiting table is used as a set because order doesn't matter. +local waiting = setmetatable({}, weak_keys) + +-- Every fiber has a name, for diagnostic purposes. Names must be unique. +-- A colliding name will be suffixed with an integer. +-- Predefine 'main' with our marker so nobody else claims that name. +local names = setmetatable({main='main'}, weak_keys) +local byname = setmetatable({main='main'}, weak_values) +-- each colliding name has its own distinct suffix counter +local suffix = {} + +-- Specify a nullary idle() callback to be called whenever there are no ready +-- fibers but there are waiting fibers. The idle() callback is responsible for +-- changing zero or more waiting fibers to ready fibers by calling +-- fiber.wake(), although a given call may leave them all still waiting. +-- When there are no ready fibers, it's a good idea for the idle() function to +-- return control to a higher-level execution agent. Simply returning without +-- changing any fiber's status will spin the CPU. +-- The idle() callback can return non-nil to exit fiber.run() with that value. +function fiber._idle() + error('fiber.yield(): you must first call set_idle(nullary idle() function)') +end + +function fiber.set_idle(func) + fiber._idle = func +end + +-- Launch a new Lua fiber, ready to run. +function fiber.launch(name, func, ...) + local args = table.pack(...) + local co = coroutine.create(function() func(table.unpack(args)) end) + -- a new fiber is ready to run + table.insert(ready, co) + local namekey = name + while byname[namekey] do + if not suffix[name] then + suffix[name] = 1 + end + suffix[name] += 1 + namekey = name .. tostring(suffix[name]) + end + -- found a namekey not yet in byname: set it + byname[namekey] = co + -- and remember it as this fiber's name + names[co] = namekey +-- debug('launch(%s)', namekey) +-- debug('byname[%s] = %s', namekey, tostring(byname[namekey])) +-- debug('names[%s] = %s', tostring(co), names[co]) +-- debug('ready[-1] = %s', tostring(ready[#ready])) +end + +-- for debugging +function fiber.print_all() + print('Ready fibers:' .. if next(ready) then '' else ' none') + for _, co in pairs(ready) do + printf(' %s: %s', fiber.get_name(co), fiber.status(co)) + end + print('Waiting fibers:' .. if next(waiting) then '' else ' none') + for co in pairs(waiting) do + printf(' %s: %s', fiber.get_name(co), fiber.status(co)) + end +end + +-- return either the running coroutine or, if called from the main thread, +-- 'main' +function fiber.running() + return coroutine.running() or 'main' +end + +-- Query a fiber's name (nil for the running fiber) +function fiber.get_name(co) + if not co then + co = fiber.running() + end + if not names[co] then + return 'unknown' + end + return names[co] +end + +-- Query status of the passed fiber +function fiber.status(co) + local running = coroutine.running() + if (not co) or co == running then + -- silly to ask the status of the running fiber: it's 'running' + return 'running' + end + if co ~= 'main' then + -- for any coroutine but main, consult coroutine.status() + local status = coroutine.status(co) + if status ~= 'suspended' then + return status + end + -- here co is suspended, answer needs further refinement + else + -- co == 'main' + if not running then + -- asking about 'main' from the main fiber + return 'running' + end + -- asking about 'main' from some other fiber, so presumably main is suspended + end + -- here we know co is suspended -- but is it ready to run? + if waiting[co] then + return 'waiting' + end + -- not waiting should imply ready: sanity check + for _, maybe in pairs(ready) do + if maybe == co then + return 'ready' + end + end + -- Calls within yield() between popping the next ready fiber and + -- re-appending it to the list are in this state. Once we're done + -- debugging yield(), we could reinstate either of the below. +-- error(string.format('fiber.status(%s) is stumped', fiber.get_name(co))) +-- print(string.format('*** fiber.status(%s) is stumped', fiber.get_name(co))) + return '(unknown)' +end + +-- change the running fiber's status to waiting +local function set_waiting() + -- if called from the main fiber, inject a 'main' marker into the list + co = fiber.running() + -- delete from ready list + for i, maybe in pairs(ready) do + if maybe == co then + table.remove(ready, i) + break + end + end + -- add to waiting list + waiting[co] = true +end + +-- Suspend the current fiber until some other fiber calls fiber.wake() on it +function fiber.wait() + set_waiting() + -- now yield to other fibers + fiber.yield() +end + +-- Mark a suspended fiber as being ready to run +function fiber.wake(co) + if not waiting[co] then + error(string.format('fiber.wake(%s) but status=%s, ready=%s, waiting=%s', + names[co], fiber.status(co), ready[co], waiting[co])) + end + -- delete from waiting list + waiting[co] = nil + -- add to end of ready list + table.insert(ready, co) + -- but don't yet resume it: that happens next time we reach yield() +end + +-- Run fibers until all but main have terminated: return nil. +-- Or until configured idle() callback returns x ~= nil: return x. +function fiber.run() + -- A fiber calling run() is not also doing other useful work. Tell yield() + -- that we're waiting. Otherwise it would keep seeing that our caller is + -- ready and return to us, instead of realizing that all coroutines are + -- waiting and call idle(). + set_waiting() + local others, idle_done + repeat + debug('%s calling fiber.run() calling yield()', fiber.get_name()) + others, idle_done = fiber.yield() + debug("%s fiber.run()'s yield() returned %s, %s", fiber.get_name(), + tostring(others), tostring(idle_done)) + until (not others) + debug('%s fiber.run() done', fiber.get_name()) + fiber.wake(fiber.running()) + -- Once there are no more waiting fibers, and the only ready fiber is + -- main, return to main. All previously-launched fibers are done. Possibly + -- the chunk is done, or the chunk may decide to launch a new batch of + -- fibers. + return idle_done +end + +-- pop and return the next not-dead fiber in the ready list, or nil if none remain +local function live_ready_iter() + -- don't write + -- for co in table.remove, ready, 1 + -- because it would keep passing a new second parameter! + for co in function() return table.remove(ready, 1) end do + debug('%s live_ready_iter() sees %s, status %s', + fiber.get_name(), fiber.get_name(co), fiber.status(co)) + -- keep removing the head entry until we find one that's not dead, + -- discarding any dead coroutines along the way + if co == 'main' or coroutine.status(co) ~= 'dead' then + debug('%s live_ready_iter() returning %s', + fiber.get_name(), fiber.get_name(co)) + return co + end + end + debug('%s live_ready_iter() returning nil', fiber.get_name()) + return nil +end + +-- prune the set of waiting fibers +local function prune_waiting() + for waiter in pairs(waiting) do + if waiter ~= 'main' and coroutine.status(waiter) == 'dead' then + waiting[waiter] = nil + end + end +end + +-- Give other ready fibers a chance to run, leaving this one ready, returning +-- after a cycle. Returns: +-- * true, nil if there remain other live fibers, whether ready or waiting +-- * false, nil if this is the only remaining fiber +-- * nil, x if configured idle() callback returned non-nil x +function fiber.yield() + if coroutine.running() then + -- seize the opportunity to make sure the viewer isn't shutting down +-- check_stop() + -- this is a real coroutine, yield normally to main or whoever + coroutine.yield() + -- main certainly still exists + return true + end + + -- This is the main fiber: coroutine.yield() doesn't work. + -- Instead, resume each of the ready fibers. + -- Prune the set of waiting fibers after every time fiber business logic + -- runs (i.e. other fibers might have terminated or hit error), such as + -- here on entry. + prune_waiting() + local others, idle_stop + repeat + for co in live_ready_iter do + -- seize the opportunity to make sure the viewer isn't shutting down +-- check_stop() + -- before we re-append co, is it the only remaining entry? + others = next(ready) + -- co is live, re-append it to the ready list + table.insert(ready, co) + if co == 'main' then + -- Since we know the caller is the main fiber, it's our turn. + -- Tell caller if there are other ready or waiting fibers. + return others or next(waiting) + end + -- not main, but some other ready coroutine: + -- use coro.resume() so we'll propagate any error encountered + coro.resume(co) + prune_waiting() + end + -- Here there are no ready fibers. Are there any waiting fibers? + if not next(waiting) then + return false + end + -- there are waiting fibers: call consumer's configured idle() function + idle_stop = fiber._idle() + if idle_stop ~= nil then + return nil, idle_stop + end + prune_waiting() + -- loop "forever", that is, until: + -- * main is ready, or + -- * there are neither ready fibers nor waiting fibers, or + -- * fiber._idle() returned non-nil + until false +end + +return fiber diff --git a/indra/newview/scripts/lua/leap.lua b/indra/newview/scripts/lua/leap.lua index 81728e7230..60e8266a76 100644 --- a/indra/newview/scripts/lua/leap.lua +++ b/indra/newview/scripts/lua/leap.lua @@ -38,7 +38,10 @@ -- leap.process(). process() won't notice until the next event from the -- viewer, though. +local fiber = require('fiber') local ErrorQueue = require('ErrorQueue') +-- local debug = require('printf') +local function debug(...) end local leap = {} @@ -68,11 +71,13 @@ leap._reply, leap._command = get_event_pumps() -- later one. That means that no incoming event will ever be given to -- the old WaitForReqid object. Any coroutine waiting on the discarded -- WaitForReqid object would therefore wait forever. -leap._pending = {} +-- these are weak values tables +local weak_values = {__mode='v'} +leap._pending = setmetatable({}, weak_values) -- Our consumer will instantiate some number of WaitFor subclass objects. -- As these are traversed in descending priority order, we must keep -- them in a list. -leap._waitfors = {} +leap._waitfors = setmetatable({}, weak_values) -- It has been suggested that we should use UUIDs as ["reqid"] values, -- since UUIDs are guaranteed unique. However, as the "namespace" for -- ["reqid"] values is our very own _reply pump, we can get away with @@ -91,15 +96,13 @@ function leap.cmdpump() return leap._command end --- local inspect = require('inspect') - -- Fire and forget. Send the specified request LLSD, expecting no reply. -- In fact, should the request produce an eventual reply, it will be -- treated as an unsolicited event. -- -- See also request(), generate(). function leap.send(pump, data, reqid) --- print_debug('leap.send('..pump..', '..inspect(data)..', '..reqid..') entry') + debug('leap.send(%s, %s, %s) entry', pump, data, reqid) local data = data if type(data) == 'table' then data = table.clone(data) @@ -108,10 +111,26 @@ function leap.send(pump, data, reqid) data['reqid'] = reqid end end --- print_debug('leap.send('..pump..', '..inspect(data)..') calling post_on()') + debug('leap.send(%s, %s) calling post_on()', pump, data) post_on(pump, data) end +-- common setup code shared by request() and generate() +local function requestSetup(pump, data) + -- invent a new, unique reqid + leap._reqid += 1 + local reqid = leap._reqid + -- Instantiate a new WaitForReqid object. The priority is irrelevant + -- because, unlike the WaitFor base class, WaitForReqid does not + -- self-register on our leap._waitfors list. Instead, capture the new + -- WaitForReqid object in leap._pending so dispatch() can find it. + leap._pending[reqid] = leap.WaitForReqid:new(reqid) + -- Pass reqid to send() to stamp it into (a copy of) the request data. + debug('requestSetup(%s, %s)', pump, data) + leap.send(pump, data, reqid) + return reqid +end + -- Send the specified request LLSD, expecting exactly one reply. Block -- the calling coroutine until we receive that reply. -- @@ -131,39 +150,20 @@ end -- -- See also send(), generate(). function leap.request(pump, data) - local reqid = leap._requestSetup(pump, data) + local reqid = requestSetup(pump, data) local waitfor = leap._pending[reqid] --- print_debug('leap.request('..tostring(pump)..', '..inspect(data)..') about to wait on '.. --- tostring(waitfor)) + debug('leap.request(%s, %s) about to wait on %s', pump, data, tostring(waitfor)) local ok, response = pcall(waitfor.wait, waitfor) --- print_debug('leap.request('..tostring(pump)..', '..inspect(data)..') got '.. --- tostring(ok)..': '..inspect(response)) + debug('leap.request(%s, %s) got %s: %s', pump, data, ok, response) -- kill off temporary WaitForReqid object, even if error leap._pending[reqid] = nil if ok then - response.reqid = nil return response else error(response) end end --- common setup code shared by request() and generate() -function leap._requestSetup(pump, data) - -- invent a new, unique reqid - leap._reqid += 1 - local reqid = leap._reqid - -- Instantiate a new WaitForReqid object. The priority is irrelevant - -- because, unlike the WaitFor base class, WaitForReqid does not - -- self-register on our leap._waitfors list. Instead, capture the new - -- WaitForReqid object in leap._pending so _dispatch() can find it. - leap._pending[reqid] = leap.WaitForReqid:new(reqid) - -- Pass reqid to send() to stamp it into (a copy of) the request data. --- print_debug('leap._requestSetup('..tostring(pump)..', '..inspect(data)..')') - leap.send(pump, data, reqid) - return reqid -end - -- Send the specified request LLSD, expecting an arbitrary number of replies. -- Each one is yielded on receipt. If you omit checklast, this is an infinite -- generator; it's up to the caller to recognize when the last reply has been @@ -178,7 +178,7 @@ function leap.generate(pump, data, checklast) -- Invent a new, unique reqid. Arrange to handle incoming events -- bearing that reqid. Stamp the outbound request with that reqid, and -- send it. - local reqid = leap._requestSetup(pump, data) + local reqid = requestSetup(pump, data) local waitfor = leap._pending[reqid] local ok, response repeat @@ -186,7 +186,6 @@ function leap.generate(pump, data, checklast) if not ok then break end - response.reqid = nil coroutine.yield(response) until checklast and checklast(response) -- If we break the above loop, whether or not due to error, clean up. @@ -196,78 +195,79 @@ function leap.generate(pump, data, checklast) end end --- Kick off response processing. The calling script must create and resume one --- or more coroutines to perform viewer requests using send(), request() or --- generate() before calling this function to handle responses. --- --- While waiting for responses from the viewer, the C++ coroutine running the --- calling Lua script is blocked: no other Lua coroutine is running. -function leap.process() - leap._done = false - local ok, pump, data - while not leap._done do --- print_debug('leap.process() calling get_event_next()') - ok, pump, data = pcall(get_event_next) --- print_debug('leap.process() got '..tostring(ok)..': '..pump..', '..inspect(data)) - -- ok false means get_event_next() raised a Lua error - -- data nil means get_event_next() returned (pump, LLSD()) to indicate done - if not (ok and data) then - break - end - leap._dispatch(pump, data) - end --- print_debug('leap.process() done') +local function cleanup(message) -- we're done: clean up all pending coroutines - -- if ok, then we're just done. - -- if not ok, then 'pump' is actually the error message. - message = if ok then 'done' else pump for i, waitfor in pairs(leap._pending) do - waitfor:_exception(message) + waitfor:exception(message) end for i, waitfor in pairs(leap._waitfors) do - waitfor:_exception(message) - end - -- now that we're done with cleanup, propagate the error we caught above - if not ok then - error(pump) + waitfor:exception(message) end end -function leap.done() - leap._done = true +-- Handle an incoming (pump, data) event with no recognizable ['reqid'] +local function unsolicited(pump, data) + -- we maintain waitfors in descending priority order, so the first waitfor + -- to claim this event is the one with the highest priority + for i, waitfor in pairs(leap._waitfors) do + debug('unsolicited() checking %s', waitfor.name) + if waitfor:handle(pump, data) then + return + end + end + print_debug(string.format('unsolicited(%s, %s) discarding unclaimed event', pump, data)) end -- Route incoming (pump, data) event to the appropriate waiting coroutine. -function leap._dispatch(pump, data) +local function dispatch(pump, data) local reqid = data['reqid'] -- if the response has no 'reqid', it's not from request() or generate() if reqid == nil then - return leap._unsolicited(pump, data) + return unsolicited(pump, data) end -- have reqid; do we have a WaitForReqid? local waitfor = leap._pending[reqid] if waitfor == nil then - return leap._unsolicited(pump, data) + return unsolicited(pump, data) end -- found the right WaitForReqid object, let it handle the event - data['reqid'] = nil - waitfor:_handle(pump, data) + waitfor:handle(pump, data) end --- Handle an incoming (pump, data) event with no recognizable ['reqid'] -function leap._unsolicited(pump, data) - -- we maintain waitfors in descending priority order, so the first waitfor - -- to claim this event is the one with the highest priority - for i, waitfor in pairs(leap._waitfors) do - if waitfor:_handle(pump, data) then - return - end +-- We configure fiber.set_idle() function. fiber.yield() calls the configured +-- idle callback whenever there are waiting fibers but no ready fibers. In +-- our case, that means it's time to fetch another incoming viewer event. +fiber.set_idle(function () + -- If someone has called leap.done(), then tell fiber.yield() to break loop. + if leap._done then + cleanup('done') + return 'done' + end + debug('leap.idle() calling get_event_next()') + local ok, pump, data = pcall(get_event_next) + debug('leap.idle() got %s: %s, %s', ok, pump, data) + -- ok false means get_event_next() raised a Lua error, pump is message + if not ok then + cleanup(pump) + error(pump) + end + -- data nil means get_event_next() returned (pump, LLSD()) to indicate done + if not data then + cleanup('end') + return 'end' end --- print_debug('_unsolicited(', pump, ', ', data, ') discarding unclaimed event') + -- got a real pump, data pair + dispatch(pump, data) + -- return to fiber.yield(): any incoming message might result in one or + -- more fibers becoming ready +end) + +function leap.done() + leap._done = true end -- called by WaitFor.enable() -function leap._registerWaitFor(waitfor) +local function registerWaitFor(waitfor) table.insert(leap._waitfors, waitfor) -- keep waitfors sorted in descending order of specified priority table.sort(leap._waitfors, @@ -275,7 +275,7 @@ function leap._registerWaitFor(waitfor) end -- called by WaitFor.disable() -function leap._unregisterWaitFor(waitfor) +local function unregisterWaitFor(waitfor) for i, w in pairs(leap._waitfors) do if w == waitfor then leap._waitfors[i] = nil @@ -322,8 +322,13 @@ end -- --------------------------------- WaitFor --------------------------------- leap.WaitFor = { _id=0 } +function leap.WaitFor.tostring(self) + -- Lua (sub)classes have no name; can't prefix with that + return self.name +end + function leap.WaitFor:new(priority, name) - local obj = setmetatable({}, self) + local obj = setmetatable({__tostring=leap.WaitFor.tostring}, self) self.__index = self obj.priority = priority @@ -343,16 +348,11 @@ function leap.WaitFor:new(priority, name) return obj end -function leap.WaitFor.tostring(self) - -- Lua (sub)classes have no name; can't prefix with that - return self.name -end - -- Re-enable a disable()d WaitFor object. New WaitFor objects are -- enable()d by default. function leap.WaitFor:enable() if not self._registered then - leap._registerWaitFor(self) + registerWaitFor(self) self._registered = true end end @@ -360,7 +360,7 @@ end -- Disable an enable()d WaitFor object. function leap.WaitFor:disable() if self._registered then - leap._unregisterWaitFor(self) + unregisterWaitFor(self) self._registered = false end end @@ -368,18 +368,12 @@ end -- Block the calling coroutine until a suitable unsolicited event (one -- for which filter() returns the event) arrives. function leap.WaitFor:wait() --- print_debug(self.name .. ' about to wait') - item = self._queue:Dequeue() --- print_debug(self.name .. ' got ', item) + debug('%s about to wait', self.name) + local item = self._queue:Dequeue() + debug('%s got %s', self.name, item) return item end --- Loop over wait() calls. -function leap.WaitFor:iterate() - -- on each iteration, call self.wait(self) - return self.wait, self, nil -end - -- Override filter() to examine the incoming event in whatever way -- makes sense. -- @@ -395,9 +389,10 @@ function leap.WaitFor:filter(pump, data) error('You must override the WaitFor.filter() method') end --- called by leap._unsolicited() for each WaitFor in leap._waitfors -function leap.WaitFor:_handle(pump, data) - item = self:filter(pump, data) +-- called by unsolicited() for each WaitFor in leap._waitfors +function leap.WaitFor:handle(pump, data) + local item = self:filter(pump, data) + debug('%s.filter() returned %s', self.name, item) -- if this item doesn't pass the filter, we're not interested if not item then return false @@ -407,13 +402,13 @@ function leap.WaitFor:_handle(pump, data) return true end --- called by WaitFor:_handle() for an accepted event +-- called by WaitFor:handle() for an accepted event function leap.WaitFor:process(item) self._queue:Enqueue(item) end -- called by leap.process() when get_event_next() raises an error -function leap.WaitFor:_exception(message) +function leap.WaitFor:exception(message) print_warning(self.name .. ' error: ' .. message) self._queue:Error(message) end diff --git a/indra/newview/scripts/lua/printf.lua b/indra/newview/scripts/lua/printf.lua new file mode 100644 index 0000000000..584cd4f391 --- /dev/null +++ b/indra/newview/scripts/lua/printf.lua @@ -0,0 +1,19 @@ +-- printf(...) is short for print(string.format(...)) + +local inspect = require 'inspect' + +local function printf(...) + -- string.format() only handles numbers and strings. + -- Convert anything else to string using the inspect module. + local args = {} + for _, arg in pairs(table.pack(...)) do + if type(arg) == 'number' or type(arg) == 'string' then + table.insert(args, arg) + else + table.insert(args, inspect(arg)) + end + end + print(string.format(table.unpack(args))) +end + +return printf -- cgit v1.2.3 From bb39a8b223f78205a10ffcb61e3b3bfe05b3fd1a Mon Sep 17 00:00:00 2001 From: Nat Goodspeed Date: Fri, 22 Mar 2024 21:04:48 +0900 Subject: Fix a couple bugs in fiber.lua machinery. This fixes a hang if the Lua script explicitly calls fiber.run() before LuaState::expr()'s implicit fiber.run() call. Make fiber.run() remove the calling fiber from the ready list to avoid an infinite loop when all other fibers have terminated: "You're ready!" "Okay, yield()." "You're ready again!" ... But don't claim it's waiting, either, because then when all other fibers have terminated, we'd call idle() in the vain hope that something would make that one last fiber ready. WaitQueue:_wake_waiters() needs to wake waiting fibers if the queue's not empty OR it's been closed. Introduce leap.WaitFor:close() to close the queue gracefully so that a looping waiter can terminate, instead of using WaitFor:exception(), which stops the whole script once it propagates. Make leap's cleanup() function call close(). Streamline fiber.get_name() by using 'or' instead of if ... then. Streamline fiber.status() and fiber.set_waiting() by using table.find() instead of a loop. --- indra/newview/scripts/lua/ErrorQueue.lua | 4 +++ indra/newview/scripts/lua/WaitQueue.lua | 2 +- indra/newview/scripts/lua/fiber.lua | 42 +++++++++++++++----------------- indra/newview/scripts/lua/leap.lua | 9 +++++-- 4 files changed, 31 insertions(+), 26 deletions(-) (limited to 'indra/newview/scripts/lua') diff --git a/indra/newview/scripts/lua/ErrorQueue.lua b/indra/newview/scripts/lua/ErrorQueue.lua index a6d4470044..076742815a 100644 --- a/indra/newview/scripts/lua/ErrorQueue.lua +++ b/indra/newview/scripts/lua/ErrorQueue.lua @@ -3,18 +3,22 @@ -- raise that error. local WaitQueue = require('WaitQueue') +-- local debug = require('printf') +local function debug(...) end local ErrorQueue = WaitQueue:new() function ErrorQueue:Error(message) -- Setting Error() is a marker, like closing the queue. Once we reach the -- error, every subsequent Dequeue() call will raise the same error. + debug('Setting self._closed to %q', message) self._closed = message self:_wake_waiters() end function ErrorQueue:Dequeue() local value = WaitQueue.Dequeue(self) + debug('ErrorQueue:Dequeue: base Dequeue() got %s', value) if value ~= nil then -- queue not yet closed, show caller return value diff --git a/indra/newview/scripts/lua/WaitQueue.lua b/indra/newview/scripts/lua/WaitQueue.lua index b15e9c443b..f69baff09b 100644 --- a/indra/newview/scripts/lua/WaitQueue.lua +++ b/indra/newview/scripts/lua/WaitQueue.lua @@ -38,7 +38,7 @@ function WaitQueue:_wake_waiters() -- Unlike OS threads, with cooperative concurrency it doesn't make sense -- to "notify all": we need wake only one of the waiting Dequeue() -- callers. - if not self:IsEmpty() and next(self._waiters) then + if ((not self:IsEmpty()) or self._closed) and next(self._waiters) then -- Pop the oldest waiting coroutine instead of the most recent, for -- more-or-less round robin fairness. But skip any coroutines that -- have gone dead in the meantime. diff --git a/indra/newview/scripts/lua/fiber.lua b/indra/newview/scripts/lua/fiber.lua index f18d133cc8..8ed99f12b7 100644 --- a/indra/newview/scripts/lua/fiber.lua +++ b/indra/newview/scripts/lua/fiber.lua @@ -104,13 +104,7 @@ end -- Query a fiber's name (nil for the running fiber) function fiber.get_name(co) - if not co then - co = fiber.running() - end - if not names[co] then - return 'unknown' - end - return names[co] + return names[co or fiber.running()] or 'unknown' end -- Query status of the passed fiber @@ -140,10 +134,8 @@ function fiber.status(co) return 'waiting' end -- not waiting should imply ready: sanity check - for _, maybe in pairs(ready) do - if maybe == co then - return 'ready' - end + if table.find(ready, co) then + return 'ready' end -- Calls within yield() between popping the next ready fiber and -- re-appending it to the list are in this state. Once we're done @@ -158,11 +150,9 @@ local function set_waiting() -- if called from the main fiber, inject a 'main' marker into the list co = fiber.running() -- delete from ready list - for i, maybe in pairs(ready) do - if maybe == co then - table.remove(ready, i) - break - end + local i = table.find(ready, co) + if i then + table.remove(ready, i) end -- add to waiting list waiting[co] = true @@ -191,11 +181,16 @@ end -- Run fibers until all but main have terminated: return nil. -- Or until configured idle() callback returns x ~= nil: return x. function fiber.run() - -- A fiber calling run() is not also doing other useful work. Tell yield() - -- that we're waiting. Otherwise it would keep seeing that our caller is - -- ready and return to us, instead of realizing that all coroutines are - -- waiting and call idle(). - set_waiting() + -- A fiber calling run() is not also doing other useful work. Remove the + -- calling fiber from the ready list. Otherwise yield() would keep seeing + -- that our caller is ready and return to us, instead of realizing that + -- all coroutines are waiting and call idle(). But don't say we're + -- waiting, either, because then when all other fibers have terminated + -- we'd call idle() forever waiting for something to make us ready again. + local i = table.find(ready, fiber.running()) + if i then + table.remove(ready, i) + end local others, idle_done repeat debug('%s calling fiber.run() calling yield()', fiber.get_name()) @@ -204,9 +199,10 @@ function fiber.run() tostring(others), tostring(idle_done)) until (not others) debug('%s fiber.run() done', fiber.get_name()) - fiber.wake(fiber.running()) + -- For whatever it's worth, put our own fiber back in the ready list. + table.insert(ready, fiber.running()) -- Once there are no more waiting fibers, and the only ready fiber is - -- main, return to main. All previously-launched fibers are done. Possibly + -- us, return to caller. All previously-launched fibers are done. Possibly -- the chunk is done, or the chunk may decide to launch a new batch of -- fibers. return idle_done diff --git a/indra/newview/scripts/lua/leap.lua b/indra/newview/scripts/lua/leap.lua index 60e8266a76..77f3a3e116 100644 --- a/indra/newview/scripts/lua/leap.lua +++ b/indra/newview/scripts/lua/leap.lua @@ -198,10 +198,10 @@ end local function cleanup(message) -- we're done: clean up all pending coroutines for i, waitfor in pairs(leap._pending) do - waitfor:exception(message) + waitfor:close() end for i, waitfor in pairs(leap._waitfors) do - waitfor:exception(message) + waitfor:close() end end @@ -407,6 +407,11 @@ function leap.WaitFor:process(item) self._queue:Enqueue(item) end +-- called by cleanup() at end +function leap.WaitFor:close() + self._queue:close() +end + -- called by leap.process() when get_event_next() raises an error function leap.WaitFor:exception(message) print_warning(self.name .. ' error: ' .. message) -- cgit v1.2.3 From 2dc003779443db99f46b3db6d17a1954f7b141dd Mon Sep 17 00:00:00 2001 From: Nat Goodspeed Date: Sat, 23 Mar 2024 17:43:07 +0900 Subject: Make leap.request() work even from Lua's main thread. Recast fiber.yield() as internal function scheduler(). Move fiber.run() after it so it can call scheduler() as a local function. Add new fiber.yield() that also calls scheduler(); the added value of this new fiber.yield() over plain scheduler() is that if scheduler() returns before the caller is ready (because the configured set_idle() function returned non-nil), it produces an explicit error rather than returning to its caller. So the caller can assume that when fiber.yield() returns normally, the calling fiber is ready. This allows any fiber, including the main thread, to call fiber.yield() or fiber.wait(). This supports using leap.request(), which posts a request and then waits on a WaitForReqid, which calls ErrorQueue:Dequeue(), which calls fiber.wait(). WaitQueue:_wake_waiters() must call fiber.status() instead of coroutine.status() so it understands the special token 'main'. Add a new llluamanager_test.cpp test to exercise calling leap.request() from Lua's main thread. --- indra/newview/scripts/lua/WaitQueue.lua | 2 +- indra/newview/scripts/lua/fiber.lua | 106 +++++++++++++++++++++----------- 2 files changed, 71 insertions(+), 37 deletions(-) (limited to 'indra/newview/scripts/lua') diff --git a/indra/newview/scripts/lua/WaitQueue.lua b/indra/newview/scripts/lua/WaitQueue.lua index f69baff09b..a34dbef4d7 100644 --- a/indra/newview/scripts/lua/WaitQueue.lua +++ b/indra/newview/scripts/lua/WaitQueue.lua @@ -43,7 +43,7 @@ function WaitQueue:_wake_waiters() -- more-or-less round robin fairness. But skip any coroutines that -- have gone dead in the meantime. local waiter = table.remove(self._waiters, 1) - while waiter and coroutine.status(waiter) ~= "suspended" do + while waiter and fiber.status(waiter) == "dead" do waiter = table.remove(self._waiters, 1) end -- do we still have at least one waiting coroutine? diff --git a/indra/newview/scripts/lua/fiber.lua b/indra/newview/scripts/lua/fiber.lua index 8ed99f12b7..7dc67f510c 100644 --- a/indra/newview/scripts/lua/fiber.lua +++ b/indra/newview/scripts/lua/fiber.lua @@ -178,36 +178,6 @@ function fiber.wake(co) -- but don't yet resume it: that happens next time we reach yield() end --- Run fibers until all but main have terminated: return nil. --- Or until configured idle() callback returns x ~= nil: return x. -function fiber.run() - -- A fiber calling run() is not also doing other useful work. Remove the - -- calling fiber from the ready list. Otherwise yield() would keep seeing - -- that our caller is ready and return to us, instead of realizing that - -- all coroutines are waiting and call idle(). But don't say we're - -- waiting, either, because then when all other fibers have terminated - -- we'd call idle() forever waiting for something to make us ready again. - local i = table.find(ready, fiber.running()) - if i then - table.remove(ready, i) - end - local others, idle_done - repeat - debug('%s calling fiber.run() calling yield()', fiber.get_name()) - others, idle_done = fiber.yield() - debug("%s fiber.run()'s yield() returned %s, %s", fiber.get_name(), - tostring(others), tostring(idle_done)) - until (not others) - debug('%s fiber.run() done', fiber.get_name()) - -- For whatever it's worth, put our own fiber back in the ready list. - table.insert(ready, fiber.running()) - -- Once there are no more waiting fibers, and the only ready fiber is - -- us, return to caller. All previously-launched fibers are done. Possibly - -- the chunk is done, or the chunk may decide to launch a new batch of - -- fibers. - return idle_done -end - -- pop and return the next not-dead fiber in the ready list, or nil if none remain local function live_ready_iter() -- don't write @@ -237,16 +207,24 @@ local function prune_waiting() end end --- Give other ready fibers a chance to run, leaving this one ready, returning --- after a cycle. Returns: --- * true, nil if there remain other live fibers, whether ready or waiting +-- Run other ready fibers, leaving this one ready, returning after a cycle. +-- Returns: +-- * true, nil if there remain other live fibers, whether ready or waiting, +-- but it's our turn to run -- * false, nil if this is the only remaining fiber --- * nil, x if configured idle() callback returned non-nil x -function fiber.yield() +-- * nil, x if configured idle() callback returns non-nil x +local function scheduler() + -- scheduler() is asymmetric because Lua distinguishes the main thread + -- from other coroutines. The main thread can't yield; it can only resume + -- other coroutines. So although an arbitrary coroutine could resume still + -- other arbitrary coroutines, it could NOT resume the main thread because + -- the main thread can't yield. Therefore, scheduler() delegates its real + -- processing to the main thread. If called from a coroutine, pass control + -- back to the main thread. if coroutine.running() then -- seize the opportunity to make sure the viewer isn't shutting down -- check_stop() - -- this is a real coroutine, yield normally to main or whoever + -- this is a real coroutine, yield normally to main thread coroutine.yield() -- main certainly still exists return true @@ -294,4 +272,60 @@ function fiber.yield() until false end +-- Let other fibers run. This is useful in either of two cases: +-- * fiber.wait() calls this to run other fibers while this one is waiting. +-- fiber.yield() (and therefore fiber.wait()) works from the main thread as +-- well as from explicitly-launched fibers, without the caller having to +-- care. +-- * A long-running fiber that doesn't often call fiber.wait() should sprinkle +-- in fiber.yield() calls to interleave processing on other fibers. +function fiber.yield() + -- The difference between this and fiber.run() is that fiber.yield() + -- assumes its caller has work to do. yield() returns to its caller as + -- soon as scheduler() pops this fiber from the ready list. fiber.run() + -- continues looping until all other fibers have terminated, or the + -- set_idle() callback tells it to stop. + local others, idle_done = scheduler() + -- scheduler() returns either if we're ready, or if idle_done ~= nil. + if idle_done ~= nil then + -- Returning normally from yield() means the caller can carry on with + -- its pending work. But in this case scheduler() returned because the + -- configured set_idle() function interrupted it -- not because we're + -- actually ready. Don't return normally. + error('fiber.set_idle() interrupted yield() with: ' .. tostring(idle_done)) + end + -- We're ready! Just return to caller. In this situation we don't care + -- whether there are other ready fibers. +end + +-- Run fibers until all but main have terminated: return nil. +-- Or until configured idle() callback returns x ~= nil: return x. +function fiber.run() + -- A fiber calling run() is not also doing other useful work. Remove the + -- calling fiber from the ready list. Otherwise yield() would keep seeing + -- that our caller is ready and return to us, instead of realizing that + -- all coroutines are waiting and call idle(). But don't say we're + -- waiting, either, because then when all other fibers have terminated + -- we'd call idle() forever waiting for something to make us ready again. + local i = table.find(ready, fiber.running()) + if i then + table.remove(ready, i) + end + local others, idle_done + repeat + debug('%s calling fiber.run() calling scheduler()', fiber.get_name()) + others, idle_done = scheduler() + debug("%s fiber.run()'s scheduler() returned %s, %s", fiber.get_name(), + tostring(others), tostring(idle_done)) + until (not others) + debug('%s fiber.run() done', fiber.get_name()) + -- For whatever it's worth, put our own fiber back in the ready list. + table.insert(ready, fiber.running()) + -- Once there are no more waiting fibers, and the only ready fiber is + -- us, return to caller. All previously-launched fibers are done. Possibly + -- the chunk is done, or the chunk may decide to launch a new batch of + -- fibers. + return idle_done +end + return fiber -- cgit v1.2.3