summaryrefslogtreecommitdiff
path: root/indra/llmessage/llpumpio.h
blob: 50f7411298dbb51426347bb5e2ecc3b2a2161184 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
/** 
 * @file llpumpio.h
 * @author Phoenix
 * @date 2004-11-19
 * @brief Declaration of pump class which manages io chains.
 *
 * Copyright (c) 2004-$CurrentYear$, Linden Research, Inc.
 * $License$
 */

#ifndef LL_LLPUMPIO_H
#define LL_LLPUMPIO_H

#include <set>
#if LL_LINUX  // needed for PATH_MAX in APR.
#include <sys/param.h>
#endif

#include "apr-1/apr_pools.h"
#include "llbuffer.h"
#include "llframetimer.h"
#include "lliopipe.h"
#include "llrun.h"

// Define this to enable use with the APR thread library.
//#define LL_THREADS_APR 1

// some simple constants to help with timeouts
extern const F32 DEFAULT_CHAIN_EXPIRY_SECS;
extern const F32 SHORT_CHAIN_EXPIRY_SECS;
extern const F32 NEVER_CHAIN_EXPIRY_SECS;

/** 
 * @class LLPumpIO
 * @brief Class to manage sets of io chains.
 *
 * The pump class provides a thread abstraction for doing IO based
 * communication between two threads in a structured and optimized for
 * processor time. The primary usage is to create a pump, and call
 * <code>pump()</code> on a thread used for IO and call
 * <code>respond()</code> on a thread that is expected to do higher
 * level processing. You can call almost any other method from any
 * thread - see notes for each method for details. In order for the
 * threading abstraction to work, you need to call <code>prime()</code>
 * with a valid apr pool.
 * A pump instance manages much of the state for the pipe, including
 * the list of pipes in the chain, the channel for each element in the
 * chain, the buffer, and if any pipe has marked the stream or process
 * as done. Pipes can also set file descriptor based conditional
 * statements so that calls to process do not happen until data is
 * ready to be read or written.  Pipes control execution of calls to
 * process by returning a status code such as STATUS_OK or
 * STATUS_BREAK.
 * One way to conceptualize the way IO will work is that a pump
 * combines the unit processing of pipes to behave like file pipes on
 * the unix command line.
 */
class LLPumpIO
{
public:
	/**
	 * @brief Constructor.
	 */
	LLPumpIO(apr_pool_t* pool);

	/**
	 * @brief Destructor.
	 */
	~LLPumpIO();

	/**
	 * @brief Prepare this pump for usage.
	 *
	 * If you fail to call this method prior to use, the pump will
	 * try to work, but will not come with any thread locking
	 * mechanisms.
	 * @param pool The apr pool to use.
	 * @return Returns true if the pump is primed.
	 */
	bool prime(apr_pool_t* pool);

	/**
	 * @brief Typedef for having a chain of pipes.
	 */
	typedef std::vector<LLIOPipe::ptr_t> chain_t;

	/** 
	 * @brief Add a chain to this pump and process in the next cycle.
	 *
	 * This method will automatically generate a buffer and assign
	 * each link in the chain as if it were the consumer to the
	 * previous.
	 * @param chain The pipes for the chain
	 * @param timeout The number of seconds in the future to
	 * expire. Pass in 0.0f to never expire.
	 * @return Returns true if anything was added to the pump.
	 */
	bool addChain(const chain_t& chain, F32 timeout);
	
	/** 
	 * @brief Struct to associate a pipe with it's buffer io indexes.
	 */
	struct LLLinkInfo
	{
		LLIOPipe::ptr_t mPipe;
		LLChannelDescriptors mChannels;
	};

	/** 
	 * @brief Typedef for having a chain of <code>LLLinkInfo</code>
	 * instances.
	 */
	typedef std::vector<LLLinkInfo> links_t;

	/** 
	 * @brief Add a chain to this pump and process in the next cycle.
	 *
	 * This method provides a slightly more sophisticated method for
	 * adding a chain where the caller can specify which link elements
	 * are on what channels. This method will fail if no buffer is
	 * provided since any calls to generate new channels for the
	 * buffers will cause unpredictable interleaving of data.
	 * @param links The pipes and io indexes for the chain
	 * @param data Shared pointer to data buffer
	 * @param context Potentially undefined context meta-data for chain.
	 * @param timeout The number of seconds in the future to
	 * expire. Pass in 0.0f to never expire.
	 * @return Returns true if anything was added to the pump.
	 */
	bool addChain(
		const links_t& links,
		LLIOPipe::buffer_ptr_t data,
		LLSD context,
		F32 timeout);

	/** 
	 * @brief Set or clear a timeout for the running chain
	 *
	 * @param timeout The number of seconds in the future to
	 * expire. Pass in 0.0f to never expire.
	 * @return Returns true if the timer was set.
	 */
	bool setTimeoutSeconds(F32 timeout);

	/** 
	 * @brief Set up file descriptors for for the running chain.
	 * @see rebuildPollset()
	 *
	 * There is currently a limit of one conditional per pipe.
	 * *NOTE: The internal mechanism for building a pollset based on
	 * pipe/pollfd/chain generates an epoll error on linux (and
	 * probably behaves similarly on other platforms) because the
	 * pollset rebuilder will add each apr_pollfd_t serially. This
	 * does not matter for pipes on the same chain, since any
	 * signalled pipe will eventually invoke a call to process(), but
	 * is a problem if the same apr_pollfd_t is on different
	 * chains. Once we have more than just network i/o on the pump,
	 * this might matter.
	 * *FIX: Given the structure of the pump and pipe relationship,
	 * this should probably go through a different mechanism than the
	 * pump. I think it would be best if the pipe had some kind of
	 * controller which was passed into <code>process()</code> rather
	 * than the pump which exposed this interface.
	 * @param pipe The pipe which is setting a conditional
	 * @param poll The entire socket and read/write condition - null to remove
	 * @return Returns true if the poll state was set.
	 */
	bool setConditional(LLIOPipe* pipe, const apr_pollfd_t* poll);

	/** 
	 * @brief Lock the current chain.
	 * @see sleepChain() since it relies on the implementation of this method.
	 *
	 * This locks the currently running chain so that no more calls to
	 * <code>process()</code> until you call <code>clearLock()</code>
	 * with the lock identifier.
	 * *FIX: Given the structure of the pump and pipe relationship,
	 * this should probably go through a different mechanism than the
	 * pump. I think it would be best if the pipe had some kind of
	 * controller which was passed into <code>process()</code> rather
	 * than the pump which exposed this interface.
	 * @return Returns the lock identifer to be used in
	 * <code>clearLock()</code> or 0 on failure.
	 */
	S32 setLock();

	/** 
	 * @brief Clears the identified lock.
	 *
	 * @param links A container for the links which will be appended
	 */
	void clearLock(S32 key);

	/**
	 * @brief Stop processing a chain for a while.
	 * @see setLock()
	 *
	 * This method will <em>not</em> update the timeout for this
	 * chain, so it is possible to sleep the chain until it is
	 * collected by the pump during a timeout cleanup.
	 * @param seconds The number of seconds in the future to
	 * resume processing.
	 * @return Returns true if the 
	 */
	bool sleepChain(F64 seconds);

	/** 
	 * @brief Copy the currently running chain link info
	 *
	 * *FIX: Given the structure of the pump and pipe relationship,
	 * this should probably go through a different mechanism than the
	 * pump. I think it would be best if the pipe had some kind of
	 * controller which was passed into <code>process()</code> rather
	 * than the pump which exposed this interface.
	 * @param links A container for the links which will be appended
	 * @return Returns true if the currently running chain was copied.
	 */
	bool copyCurrentLinkInfo(links_t& links) const;

	/** 
	 * @brief Call this method to call process on all running chains.
	 *
	 * This method iterates through the running chains, and if all
	 * pipe on a chain are unconditionally ready or if any pipe has
	 * any conditional processiong condition then process will be
	 * called on every chain which has requested processing.  that
	 * chain has a file descriptor ready, <code>process()</code> will
	 * be called for all pipes which have requested it.
	 */
	void pump();

	/** 
	 * @brief Add a chain to a special queue which will be called
	 * during the next call to <code>callback()</code> and then
	 * dropped from the queue.
	 *
	 * @param chain The IO chain that will get one <code>process()</code>.
	 */
	//void respond(const chain_t& pipes);

	/** 
	 * @brief Add pipe to a special queue which will be called
	 * during the next call to <code>callback()</code> and then dropped
	 * from the queue.
	 *
	 * This call will add a single pipe, with no buffer, context, or
	 * channel information to the callback queue. It will be called
	 * once, and then dropped.
	 * @param pipe A single io pipe which will be called
	 * @return Returns true if anything was added to the pump.
	 */
	bool respond(LLIOPipe* pipe);

	/** 
	 * @brief Add a chain to a special queue which will be called
	 * during the next call to <code>callback()</code> and then
	 * dropped from the queue.
	 *
	 * It is important to remember that you should not add a data
	 * buffer or context which may still be in another chain - that
	 * will almost certainly lead to a problems. Ensure that you are
	 * done reading and writing to those parameters, have new
	 * generated, or empty pointers. 
	 * @param links The pipes and io indexes for the chain
	 * @param data Shared pointer to data buffer
	 * @param context Potentially undefined context meta-data for chain.
	 * @return Returns true if anything was added to the pump.
	 */
	bool respond(
		const links_t& links,
		LLIOPipe::buffer_ptr_t data,
		LLSD context);

	/** 
	 * @brief Run through the callback queue and call <code>process()</code>.
	 *
	 * This call will process all prending responses and call process
	 * on each. This method will then drop all processed callback
	 * requests which may lead to deleting the referenced objects.
	 */
	void callback();

	/** 
	 * @brief Enumeration to send commands to the pump.
	 */
	enum EControl
	{
		PAUSE,
		RESUME,
	};
	
	/** 
	 * @brief Send a command to the pump.
	 *
	 * @param op What control to send to the pump.
	 */
	void control(EControl op);

protected:
	/** 
	 * @brief State of the pump
	 */
	enum EState
	{
		NORMAL,
		PAUSING,
		PAUSED
	};

	// instance data
	EState mState;
	bool mRebuildPollset;
	apr_pollset_t* mPollset;
	S32 mPollsetClientID;
	S32 mNextLock;
	std::set<S32> mClearLocks;

	// This is the pump's runnable scheduler used for handling
	// expiring locks.
	LLRunner mRunner;

	// This structure is the stuff we track while running chains.
	struct LLChainInfo
	{
		// methods
		LLChainInfo();
		void setTimeoutSeconds(F32 timeout);

		// basic member data
		bool mInit;
		S32 mLock;
		LLFrameTimer mTimer;
		links_t::iterator mHead;
		links_t mChainLinks;
		LLIOPipe::buffer_ptr_t mData;
		bool mEOS;
		LLSD mContext;

		// tracking inside the pump
		typedef std::pair<LLIOPipe::ptr_t, apr_pollfd_t> pipe_conditional_t;
		typedef std::vector<pipe_conditional_t> conditionals_t;
		conditionals_t mDescriptors;
	};

	// All the running chains & info
 	typedef std::vector<LLChainInfo> pending_chains_t;
	pending_chains_t mPendingChains;
	typedef std::list<LLChainInfo> running_chains_t;
	running_chains_t mRunningChains;

	typedef running_chains_t::iterator current_chain_t;
	current_chain_t mCurrentChain;

	// structures necessary for doing callbacks
	// since the callbacks only get one chance to run, we do not have
	// to maintain a list.
	typedef std::vector<LLChainInfo> callbacks_t;
	callbacks_t mPendingCallbacks;
	callbacks_t mCallbacks;

	// memory allocator for pollsets & mutexes.
	apr_pool_t* mPool;
	apr_pool_t* mCurrentPool;
	S32 mCurrentPoolReallocCount;

#if LL_THREADS_APR
	apr_thread_mutex_t* mChainsMutex;
	apr_thread_mutex_t* mCallbackMutex;
#else
	int* mChainsMutex;
	int* mCallbackMutex;
#endif

protected:
	void initialize(apr_pool_t* pool);
	void cleanup();

	/** 
	 * @brief Given the internal state of the chains, rebuild the pollset
	 * @see setConditional()
	 */
	void rebuildPollset();

	/** 
	 * @brief Process the chain passed in.
	 *
	 * This method will potentially modify the internals of the
	 * chain. On end, the chain.mHead will equal
	 * chain.mChainLinks.end().
	 * @param chain The LLChainInfo object to work on.
	 */
	void processChain(LLChainInfo& chain);

	/** 
	 * @brief Rewind through the chain to try to recover from an error.
	 *
	 * This method will potentially modify the internals of the
	 * chain.
	 * @param chain The LLChainInfo object to work on.
	 * @return Retuns true if someone handled the error
	 */
	bool handleChainError(LLChainInfo& chain, LLIOPipe::EStatus error);
};


#endif // LL_LLPUMPIO_H