1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
|
-- Organize Lua coroutines into fibers.
-- In this usage, the difference between coroutines and fibers is that fibers
-- have a scheduler. Yielding a fiber means allowing other fibers, plural, to
-- run: it's more than just returning control to the specific Lua thread that
-- resumed the running coroutine.
-- fiber.launch() creates a new fiber ready to run.
-- fiber.status() reports (augmented) status of the passed fiber: instead of
-- 'suspended', it returns either 'ready' or 'waiting'
-- fiber.yield() allows other fibers to run, but leaves the calling fiber
-- ready to run.
-- fiber.wait() marks the running fiber not ready, and resumes other fibers.
-- fiber.wake() marks the designated suspended fiber ready to run, but does
-- not yet resume it.
-- fiber.run() runs all current fibers until all have terminated (successfully
-- or with an error).
local printf = require 'printf'
-- local debug = printf
local function debug(...) end
local coro = require 'coro'
local fiber = {}
-- The tables in which we track fibers must have weak keys so dead fibers
-- can be garbage-collected.
local weak_values = {__mode='v'}
local weak_keys = {__mode='k'}
-- Track each current fiber as being either ready to run or not ready
-- (waiting). wait() moves the running fiber from ready to waiting; wake()
-- moves the designated fiber from waiting back to ready.
-- The ready table is used as a list so yield() can go round robin.
local ready = setmetatable({'main'}, weak_keys)
-- The waiting table is used as a set because order doesn't matter.
local waiting = setmetatable({}, weak_keys)
-- Every fiber has a name, for diagnostic purposes. Names must be unique.
-- A colliding name will be suffixed with an integer.
-- Predefine 'main' with our marker so nobody else claims that name.
local names = setmetatable({main='main'}, weak_keys)
local byname = setmetatable({main='main'}, weak_values)
-- each colliding name has its own distinct suffix counter
local suffix = {}
-- Specify a nullary idle() callback to be called whenever there are no ready
-- fibers but there are waiting fibers. The idle() callback is responsible for
-- changing zero or more waiting fibers to ready fibers by calling
-- fiber.wake(), although a given call may leave them all still waiting.
-- When there are no ready fibers, it's a good idea for the idle() function to
-- return control to a higher-level execution agent. Simply returning without
-- changing any fiber's status will spin the CPU.
-- The idle() callback can return non-nil to exit fiber.run() with that value.
function fiber._idle()
error('fiber.yield(): you must first call set_idle(nullary idle() function)')
end
function fiber.set_idle(func)
fiber._idle = func
end
-- Launch a new Lua fiber, ready to run.
function fiber.launch(name, func, ...)
local args = table.pack(...)
local co = coroutine.create(function() func(table.unpack(args)) end)
-- a new fiber is ready to run
table.insert(ready, co)
local namekey = name
while byname[namekey] do
if not suffix[name] then
suffix[name] = 1
end
suffix[name] += 1
namekey = name .. tostring(suffix[name])
end
-- found a namekey not yet in byname: set it
byname[namekey] = co
-- and remember it as this fiber's name
names[co] = namekey
-- debug('launch(%s)', namekey)
-- debug('byname[%s] = %s', namekey, tostring(byname[namekey]))
-- debug('names[%s] = %s', tostring(co), names[co])
-- debug('ready[-1] = %s', tostring(ready[#ready]))
end
-- for debugging
function fiber.print_all()
print('Ready fibers:' .. if next(ready) then '' else ' none')
for _, co in pairs(ready) do
printf(' %s: %s', fiber.get_name(co), fiber.status(co))
end
print('Waiting fibers:' .. if next(waiting) then '' else ' none')
for co in pairs(waiting) do
printf(' %s: %s', fiber.get_name(co), fiber.status(co))
end
end
-- return either the running coroutine or, if called from the main thread,
-- 'main'
function fiber.running()
return coroutine.running() or 'main'
end
-- Query a fiber's name (nil for the running fiber)
function fiber.get_name(co)
return names[co or fiber.running()] or 'unknown'
end
-- Query status of the passed fiber
function fiber.status(co)
local running = coroutine.running()
if (not co) or co == running then
-- silly to ask the status of the running fiber: it's 'running'
return 'running'
end
if co ~= 'main' then
-- for any coroutine but main, consult coroutine.status()
local status = coroutine.status(co)
if status ~= 'suspended' then
return status
end
-- here co is suspended, answer needs further refinement
else
-- co == 'main'
if not running then
-- asking about 'main' from the main fiber
return 'running'
end
-- asking about 'main' from some other fiber, so presumably main is suspended
end
-- here we know co is suspended -- but is it ready to run?
if waiting[co] then
return 'waiting'
end
-- not waiting should imply ready: sanity check
if table.find(ready, co) then
return 'ready'
end
-- Calls within yield() between popping the next ready fiber and
-- re-appending it to the list are in this state. Once we're done
-- debugging yield(), we could reinstate either of the below.
-- error(string.format('fiber.status(%s) is stumped', fiber.get_name(co)))
-- print(string.format('*** fiber.status(%s) is stumped', fiber.get_name(co)))
return '(unknown)'
end
-- change the running fiber's status to waiting
local function set_waiting()
-- if called from the main fiber, inject a 'main' marker into the list
co = fiber.running()
-- delete from ready list
local i = table.find(ready, co)
if i then
table.remove(ready, i)
end
-- add to waiting list
waiting[co] = true
end
-- Suspend the current fiber until some other fiber calls fiber.wake() on it
function fiber.wait()
set_waiting()
-- now yield to other fibers
fiber.yield()
end
-- Mark a suspended fiber as being ready to run
function fiber.wake(co)
if not waiting[co] then
error(string.format('fiber.wake(%s) but status=%s, ready=%s, waiting=%s',
names[co], fiber.status(co), ready[co], waiting[co]))
end
-- delete from waiting list
waiting[co] = nil
-- add to end of ready list
table.insert(ready, co)
-- but don't yet resume it: that happens next time we reach yield()
end
-- pop and return the next not-dead fiber in the ready list, or nil if none remain
local function live_ready_iter()
-- don't write
-- for co in table.remove, ready, 1
-- because it would keep passing a new second parameter!
for co in function() return table.remove(ready, 1) end do
debug('%s live_ready_iter() sees %s, status %s',
fiber.get_name(), fiber.get_name(co), fiber.status(co))
-- keep removing the head entry until we find one that's not dead,
-- discarding any dead coroutines along the way
if co == 'main' or coroutine.status(co) ~= 'dead' then
debug('%s live_ready_iter() returning %s',
fiber.get_name(), fiber.get_name(co))
return co
end
end
debug('%s live_ready_iter() returning nil', fiber.get_name())
return nil
end
-- prune the set of waiting fibers
local function prune_waiting()
for waiter in pairs(waiting) do
if waiter ~= 'main' and coroutine.status(waiter) == 'dead' then
waiting[waiter] = nil
end
end
end
-- Run other ready fibers, leaving this one ready, returning after a cycle.
-- Returns:
-- * true, nil if there remain other live fibers, whether ready or waiting,
-- but it's our turn to run
-- * false, nil if this is the only remaining fiber
-- * nil, x if configured idle() callback returns non-nil x
local function scheduler()
-- scheduler() is asymmetric because Lua distinguishes the main thread
-- from other coroutines. The main thread can't yield; it can only resume
-- other coroutines. So although an arbitrary coroutine could resume still
-- other arbitrary coroutines, it could NOT resume the main thread because
-- the main thread can't yield. Therefore, scheduler() delegates its real
-- processing to the main thread. If called from a coroutine, pass control
-- back to the main thread.
if coroutine.running() then
-- seize the opportunity to make sure the viewer isn't shutting down
-- check_stop()
-- this is a real coroutine, yield normally to main thread
coroutine.yield()
-- main certainly still exists
return true
end
-- This is the main fiber: coroutine.yield() doesn't work.
-- Instead, resume each of the ready fibers.
-- Prune the set of waiting fibers after every time fiber business logic
-- runs (i.e. other fibers might have terminated or hit error), such as
-- here on entry.
prune_waiting()
local others, idle_stop
repeat
for co in live_ready_iter do
-- seize the opportunity to make sure the viewer isn't shutting down
-- check_stop()
-- before we re-append co, is it the only remaining entry?
others = next(ready)
-- co is live, re-append it to the ready list
table.insert(ready, co)
if co == 'main' then
-- Since we know the caller is the main fiber, it's our turn.
-- Tell caller if there are other ready or waiting fibers.
return others or next(waiting)
end
-- not main, but some other ready coroutine:
-- use coro.resume() so we'll propagate any error encountered
coro.resume(co)
prune_waiting()
end
-- Here there are no ready fibers. Are there any waiting fibers?
if not next(waiting) then
return false
end
-- there are waiting fibers: call consumer's configured idle() function
idle_stop = fiber._idle()
if idle_stop ~= nil then
return nil, idle_stop
end
prune_waiting()
-- loop "forever", that is, until:
-- * main is ready, or
-- * there are neither ready fibers nor waiting fibers, or
-- * fiber._idle() returned non-nil
until false
end
-- Let other fibers run. This is useful in either of two cases:
-- * fiber.wait() calls this to run other fibers while this one is waiting.
-- fiber.yield() (and therefore fiber.wait()) works from the main thread as
-- well as from explicitly-launched fibers, without the caller having to
-- care.
-- * A long-running fiber that doesn't often call fiber.wait() should sprinkle
-- in fiber.yield() calls to interleave processing on other fibers.
function fiber.yield()
-- The difference between this and fiber.run() is that fiber.yield()
-- assumes its caller has work to do. yield() returns to its caller as
-- soon as scheduler() pops this fiber from the ready list. fiber.run()
-- continues looping until all other fibers have terminated, or the
-- set_idle() callback tells it to stop.
local others, idle_done = scheduler()
-- scheduler() returns either if we're ready, or if idle_done ~= nil.
if idle_done ~= nil then
-- Returning normally from yield() means the caller can carry on with
-- its pending work. But in this case scheduler() returned because the
-- configured set_idle() function interrupted it -- not because we're
-- actually ready. Don't return normally.
error('fiber.set_idle() interrupted yield() with: ' .. tostring(idle_done))
end
-- We're ready! Just return to caller. In this situation we don't care
-- whether there are other ready fibers.
end
-- Run fibers until all but main have terminated: return nil.
-- Or until configured idle() callback returns x ~= nil: return x.
function fiber.run()
-- A fiber calling run() is not also doing other useful work. Remove the
-- calling fiber from the ready list. Otherwise yield() would keep seeing
-- that our caller is ready and return to us, instead of realizing that
-- all coroutines are waiting and call idle(). But don't say we're
-- waiting, either, because then when all other fibers have terminated
-- we'd call idle() forever waiting for something to make us ready again.
local i = table.find(ready, fiber.running())
if i then
table.remove(ready, i)
end
local others, idle_done
repeat
debug('%s calling fiber.run() calling scheduler()', fiber.get_name())
others, idle_done = scheduler()
debug("%s fiber.run()'s scheduler() returned %s, %s", fiber.get_name(),
tostring(others), tostring(idle_done))
until (not others)
debug('%s fiber.run() done', fiber.get_name())
-- For whatever it's worth, put our own fiber back in the ready list.
table.insert(ready, fiber.running())
-- Once there are no more waiting fibers, and the only ready fiber is
-- us, return to caller. All previously-launched fibers are done. Possibly
-- the chunk is done, or the chunk may decide to launch a new batch of
-- fibers.
return idle_done
end
return fiber
|