1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
|
-- WaitQueue isa Queue with the added feature that when the queue is empty,
-- the Dequeue() operation blocks the calling coroutine until some other
-- coroutine Enqueue()s a new value.
local Queue = require('Queue')
local WaitQueue = Queue:new()
function WaitQueue:new()
local obj = Queue:new()
setmetatable(obj, self)
self.__index = self
obj._waiters = {}
obj._closed = false
return obj
end
function WaitQueue:Enqueue(value)
if self._closed then
error("can't Enqueue() on closed Queue")
end
-- can't simply call Queue:Enqueue(value)! That calls the method on the
-- Queue class definition, instead of calling Queue:Enqueue() on self.
-- Hand-expand the Queue:Enqueue() syntactic sugar.
Queue.Enqueue(self, value)
self:_wake_waiters()
end
function WaitQueue:_wake_waiters()
-- WaitQueue is designed to support multi-producer, multi-consumer use
-- cases. With multiple consumers, if more than one is trying to
-- Dequeue() from an empty WaitQueue, we'll have multiple waiters.
-- Unlike OS threads, with cooperative concurrency it doesn't make sense
-- to "notify all": we need resume only one of the waiting Dequeue()
-- callers. But since resuming that caller might entail either Enqueue()
-- or Dequeue() calls, recheck every time around to see if we must resume
-- another waiting coroutine.
while not self:IsEmpty() and #self._waiters > 0 do
-- Pop the oldest waiting coroutine instead of the most recent, for
-- more-or-less round robin fairness. But skip any coroutines that
-- have gone dead in the meantime.
local waiter = table.remove(self._waiters, 1)
while waiter and coroutine.status(waiter) ~= "suspended" do
waiter = table.remove(self._waiters, 1)
end
-- do we still have at least one waiting coroutine?
if waiter then
-- don't pass the head item: let the resumed coroutine retrieve it
local ok, message = coroutine.resume(waiter)
-- if resuming that waiter encountered an error, don't swallow it
if not ok then
error(message)
end
end
end
end
function WaitQueue:Dequeue()
while self:IsEmpty() do
-- Don't check for closed until the queue is empty: producer can close
-- the queue while there are still items left, and we want the
-- consumer(s) to retrieve those last few items.
if self._closed then
return nil
end
local coro = coroutine.running()
if coro == nil then
error("WaitQueue:Dequeue() trying to suspend main coroutine")
end
-- add the running coroutine to the list of waiters
table.insert(self._waiters, coro)
-- then let somebody else run
coroutine.yield()
end
-- here we're sure this queue isn't empty
return Queue.Dequeue(self)
end
function WaitQueue:close()
self._closed = true
-- close() is like Enqueueing an end marker. If there are waiting
-- consumers, give them a chance to see we're closed.
self:_wake_waiters()
end
return WaitQueue
|