1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
|
/**
* @file threadsafeschedule.h
* @author Nat Goodspeed
* @date 2021-10-02
* @brief ThreadSafeSchedule is an ordered queue in which every item has an
* associated timestamp.
*
* $LicenseInfo:firstyear=2021&license=viewerlgpl$
* Copyright (c) 2021, Linden Research, Inc.
* $/LicenseInfo$
*/
#if ! defined(LL_THREADSAFESCHEDULE_H)
#define LL_THREADSAFESCHEDULE_H
#include "chrono.h"
#include "llexception.h"
#include "llthreadsafequeue.h"
#include "tuple.h"
#include <chrono>
#include <tuple>
namespace LL
{
namespace ThreadSafeSchedulePrivate
{
using TimePoint = std::chrono::steady_clock::time_point;
// Bundle consumer's data with a TimePoint to order items by timestamp.
template <typename... Args>
using TimestampedTuple = std::tuple<TimePoint, Args...>;
// comparison functor for TimedTuples -- see TimedQueue comments
struct ReverseTupleOrder
{
template <typename Tuple>
bool operator()(const Tuple& left, const Tuple& right) const
{
return std::get<0>(left) > std::get<0>(right);
}
};
template <typename... Args>
using TimedQueue = PriorityQueueAdapter<
TimestampedTuple<Args...>,
// std::vector is the default storage for std::priority_queue,
// have to restate to specify comparison template parameter
std::vector<TimestampedTuple<Args...>>,
// std::priority_queue uses a counterintuitive comparison
// behavior: the default std::less comparator is used to present
// the *highest* value as top(). So to sort by earliest timestamp,
// we must invert by using >.
ReverseTupleOrder>;
} // namespace ThreadSafeSchedulePrivate
/**
* ThreadSafeSchedule is an ordered LLThreadSafeQueue in which every item
* is given an associated timestamp. That is, TimePoint is implicitly
* prepended to the std::tuple with the specified types.
*
* Items are popped in increasing chronological order. Moreover, any item
* with a timestamp in the future is held back until
* std::chrono::steady_clock reaches that timestamp.
*/
template <typename... Args>
class ThreadSafeSchedule:
public LLThreadSafeQueue<ThreadSafeSchedulePrivate::TimestampedTuple<Args...>,
ThreadSafeSchedulePrivate::TimedQueue<Args...>>
{
public:
using DataTuple = std::tuple<Args...>;
using TimeTuple = ThreadSafeSchedulePrivate::TimestampedTuple<Args...>;
private:
using super = LLThreadSafeQueue<TimeTuple, ThreadSafeSchedulePrivate::TimedQueue<Args...>>;
using lock_t = typename super::lock_t;
// VS 2017 needs this due to a bug:
// https://developercommunity.visualstudio.com/t/cannot-access-protected-enumerator-of-enclosing-cl/203430
enum pop_result { EMPTY=super::EMPTY, DONE=super::DONE, WAITING=super::WAITING, POPPED=super::POPPED };
public:
using Closed = LLThreadSafeQueueInterrupt;
using TimePoint = ThreadSafeSchedulePrivate::TimePoint;
using Clock = TimePoint::clock;
ThreadSafeSchedule(size_t capacity=1024):
super(capacity)
{}
/*----------------------------- push() -----------------------------*/
/// explicitly pass TimeTuple
using super::push;
/// pass DataTuple with implicit now
// This could be ambiguous for Args with a single type. Unfortunately
// we can't enable_if an individual method with a condition based on
// the *class* template arguments, only on that method's template
// arguments. We could specialize this class for the single-Args case;
// we could minimize redundancy by breaking out a common base class...
void push(const DataTuple& tuple)
{
LL_PROFILE_ZONE_SCOPED_CATEGORY_THREAD;
push(tuple_cons(Clock::now(), tuple));
}
/// individually pass each component of the TimeTuple
void push(const TimePoint& time, Args&&... args)
{
LL_PROFILE_ZONE_SCOPED_CATEGORY_THREAD;
push(TimeTuple(time, std::forward<Args>(args)...));
}
/// individually pass every component except the TimePoint (implies now)
// This could be ambiguous if the first specified template parameter
// type is also TimePoint. We could try to disambiguate, but a simpler
// approach would be for the caller to explicitly construct DataTuple
// and call that overload.
void push(Args&&... args)
{
LL_PROFILE_ZONE_SCOPED_CATEGORY_THREAD;
push(Clock::now(), std::forward<Args>(args)...);
}
/*--------------------------- tryPush() ----------------------------*/
/// explicit TimeTuple
using super::tryPush;
/// DataTuple with implicit now
bool tryPush(const DataTuple& tuple)
{
LL_PROFILE_ZONE_SCOPED_CATEGORY_THREAD;
return tryPush(tuple_cons(Clock::now(), tuple));
}
/// individually pass components
bool tryPush(const TimePoint& time, Args&&... args)
{
LL_PROFILE_ZONE_SCOPED_CATEGORY_THREAD;
return tryPush(TimeTuple(time, std::forward<Args>(args)...));
}
/// individually pass components with implicit now
bool tryPush(Args&&... args)
{
LL_PROFILE_ZONE_SCOPED_CATEGORY_THREAD;
return tryPush(Clock::now(), std::forward<Args>(args)...);
}
/*-------------------------- tryPushFor() --------------------------*/
/// explicit TimeTuple
using super::tryPushFor;
/// DataTuple with implicit now
template <typename Rep, typename Period>
bool tryPushFor(const std::chrono::duration<Rep, Period>& timeout,
const DataTuple& tuple)
{
LL_PROFILE_ZONE_SCOPED_CATEGORY_THREAD;
return tryPushFor(timeout, tuple_cons(Clock::now(), tuple));
}
/// individually pass components
template <typename Rep, typename Period>
bool tryPushFor(const std::chrono::duration<Rep, Period>& timeout,
const TimePoint& time, Args&&... args)
{
LL_PROFILE_ZONE_SCOPED_CATEGORY_THREAD;
return tryPushFor(TimeTuple(time, std::forward<Args>(args)...));
}
/// individually pass components with implicit now
template <typename Rep, typename Period>
bool tryPushFor(const std::chrono::duration<Rep, Period>& timeout,
Args&&... args)
{
LL_PROFILE_ZONE_SCOPED_CATEGORY_THREAD;
return tryPushFor(Clock::now(), std::forward<Args>(args)...);
}
/*------------------------- tryPushUntil() -------------------------*/
/// explicit TimeTuple
using super::tryPushUntil;
/// DataTuple with implicit now
template <typename Clock, typename Duration>
bool tryPushUntil(const std::chrono::time_point<Clock, Duration>& until,
const DataTuple& tuple)
{
LL_PROFILE_ZONE_SCOPED_CATEGORY_THREAD;
return tryPushUntil(until, tuple_cons(Clock::now(), tuple));
}
/// individually pass components
template <typename Clock, typename Duration>
bool tryPushUntil(const std::chrono::time_point<Clock, Duration>& until,
const TimePoint& time, Args&&... args)
{
LL_PROFILE_ZONE_SCOPED_CATEGORY_THREAD;
return tryPushUntil(until, TimeTuple(time, std::forward<Args>(args)...));
}
/// individually pass components with implicit now
template <typename Clock, typename Duration>
bool tryPushUntil(const std::chrono::time_point<Clock, Duration>& until,
Args&&... args)
{
LL_PROFILE_ZONE_SCOPED_CATEGORY_THREAD;
return tryPushUntil(until, Clock::now(), std::forward<Args>(args)...);
}
/*----------------------------- pop() ------------------------------*/
// Our consumer may or may not care about the timestamp associated
// with each popped item, so we allow retrieving either DataTuple or
// TimeTuple. One potential use would be to observe, and possibly
// adjust for, the time lag between the item time and the actual
// current time.
/// pop DataTuple by value
// It would be great to notice when sizeof...(Args) == 1 and directly
// return the first (only) value, instead of making pop()'s caller
// call std::get<0>(value). See push(DataTuple) remarks for why we
// haven't yet jumped through those hoops.
DataTuple pop()
{
LL_PROFILE_ZONE_SCOPED_CATEGORY_THREAD;
return tuple_cdr(popWithTime());
}
/// pop TimeTuple by value
TimeTuple popWithTime()
{
LL_PROFILE_ZONE_SCOPED_CATEGORY_THREAD;
lock_t lock(super::mLock);
// We can't just sit around waiting forever, given that there may
// be items in the queue that are not yet ready but will *become*
// ready in the near future. So in fact, with this class, every
// pop() becomes a tryPopUntil(), constrained to the timestamp of
// the head item. It almost doesn't matter what we specify for the
// caller's time constraint -- all we really care about is the
// head item's timestamp. Since pop() and popWithTime() are
// defined to wait until either an item becomes available or the
// queue is closed, loop until one of those things happens. The
// constraint we pass just determines how often we'll loop while
// waiting.
TimeTuple tt;
while (true)
{
// Pick a point suitably far into the future.
TimePoint until = TimePoint::clock::now() + std::chrono::hours(24);
pop_result popped = tryPopUntil_(lock, until, tt);
if (popped == POPPED)
return std::move(tt);
// DONE: throw, just as super::pop() does
if (popped == DONE)
{
LLTHROW(LLThreadSafeQueueInterrupt());
}
// WAITING: we've still got items to drain.
// EMPTY: not closed, so it's worth waiting for more items.
// Either way, loop back to wait.
}
}
// We can use tryPop(TimeTuple&) just as it stands; the only behavior
// difference is in our canPop() override method.
using super::tryPop;
/// tryPop(DataTuple&)
bool tryPop(DataTuple& tuple)
{
LL_PROFILE_ZONE_SCOPED_CATEGORY_THREAD;
TimeTuple tt;
if (! super::tryPop(tt))
return false;
tuple = tuple_cdr(std::move(tt));
return true;
}
/// for when Args has exactly one type
bool tryPop(typename std::tuple_element<1, TimeTuple>::type& value)
{
LL_PROFILE_ZONE_SCOPED_CATEGORY_THREAD;
TimeTuple tt;
if (! super::tryPop(tt))
return false;
value = std::get<1>(std::move(tt));
return true;
}
/// tryPopFor()
template <typename Rep, typename Period, typename Tuple>
bool tryPopFor(const std::chrono::duration<Rep, Period>& timeout, Tuple& tuple)
{
LL_PROFILE_ZONE_SCOPED_CATEGORY_THREAD;
// It's important to use OUR tryPopUntil() implementation, rather
// than delegating immediately to our base class.
return tryPopUntil(Clock::now() + timeout, tuple);
}
/// tryPopUntil(TimeTuple&)
template <typename Clock, typename Duration>
bool tryPopUntil(const std::chrono::time_point<Clock, Duration>& until,
TimeTuple& tuple)
{
LL_PROFILE_ZONE_SCOPED_CATEGORY_THREAD;
// super::tryPopUntil() wakes up when an item becomes available or
// we hit 'until', whichever comes first. Thing is, the current
// head of the queue could become ready sooner than either of
// those events, and we need to deliver it as soon as it does.
// Don't wait past the TimePoint of the head item.
// Naturally, lock the queue before peeking at mStorage.
return super::tryLockUntil(
until,
[this, until, &tuple](lock_t& lock)
{
// Use our time_point_cast to allow for 'until' that's a
// time_point type other than TimePoint.
return POPPED ==
tryPopUntil_(lock, LL::time_point_cast<TimePoint>(until), tuple);
});
}
pop_result tryPopUntil_(lock_t& lock, const TimePoint& until, TimeTuple& tuple)
{
LL_PROFILE_ZONE_SCOPED_CATEGORY_THREAD;
TimePoint adjusted = until;
if (! super::mStorage.empty())
{
LL_PROFILE_ZONE_NAMED("tpu - adjust");
// use whichever is earlier: the head item's timestamp, or
// the caller's limit
adjusted = min(std::get<0>(super::mStorage.front()), adjusted);
}
// now delegate to base-class tryPopUntil_()
pop_result popped;
{
LL_PROFILE_ZONE_NAMED("tpu - super");
while ((popped = pop_result(super::tryPopUntil_(lock, adjusted, tuple))) == WAITING)
{
// If super::tryPopUntil_() returns WAITING, it means there's
// a head item, but it's not yet time. But it's worth looping
// back to recheck.
}
}
return popped;
}
/// tryPopUntil(DataTuple&)
template <typename Clock, typename Duration>
bool tryPopUntil(const std::chrono::time_point<Clock, Duration>& until,
DataTuple& tuple)
{
LL_PROFILE_ZONE_SCOPED_CATEGORY_THREAD;
TimeTuple tt;
if (! tryPopUntil(until, tt))
return false;
tuple = tuple_cdr(std::move(tt));
return true;
}
/// for when Args has exactly one type
template <typename Clock, typename Duration>
bool tryPopUntil(const std::chrono::time_point<Clock, Duration>& until,
typename std::tuple_element<1, TimeTuple>::type& value)
{
LL_PROFILE_ZONE_SCOPED_CATEGORY_THREAD;
TimeTuple tt;
if (! tryPopUntil(until, tt))
return false;
value = std::get<1>(std::move(tt));
return true;
}
/*------------------------------ etc. ------------------------------*/
// We can't hide items that aren't yet ready because we can't traverse
// the underlying priority_queue: it has no iterators, only top(). So
// a consumer could observe size() > 0 and yet tryPop() returns false.
// Shrug, in a multi-consumer scenario that would be expected behavior.
using super::size;
// open/closed state
using super::close;
using super::isClosed;
using super::done;
private:
// this method is called by base class pop_() every time we're
// considering whether to deliver the current head element
bool canPop(const TimeTuple& head) const override
{
LL_PROFILE_ZONE_SCOPED_CATEGORY_THREAD;
// an item with a future timestamp isn't yet ready to pop
// (should we add some slop for overhead?)
return std::get<0>(head) <= Clock::now();
}
};
} // namespace LL
#endif /* ! defined(LL_THREADSAFESCHEDULE_H) */
|