summaryrefslogtreecommitdiff
path: root/indra/llmessage/llpumpio.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'indra/llmessage/llpumpio.cpp')
-rw-r--r--indra/llmessage/llpumpio.cpp1006
1 files changed, 1006 insertions, 0 deletions
diff --git a/indra/llmessage/llpumpio.cpp b/indra/llmessage/llpumpio.cpp
new file mode 100644
index 0000000000..0c5bd1eaea
--- /dev/null
+++ b/indra/llmessage/llpumpio.cpp
@@ -0,0 +1,1006 @@
+/**
+ * @file llpumpio.cpp
+ * @author Phoenix
+ * @date 2004-11-21
+ * @brief Implementation of the i/o pump and related functions.
+ *
+ * Copyright (c) 2004-$CurrentYear$, Linden Research, Inc.
+ * $License$
+ */
+
+#include "linden_common.h"
+#include "llpumpio.h"
+
+#include <set>
+#include "apr-1/apr_poll.h"
+
+#include "llapr.h"
+#include "llmemtype.h"
+#include "llstl.h"
+
+// This should not be in production, but it is intensely useful during
+// development.
+#if LL_LINUX
+#define LL_DEBUG_PIPE_TYPE_IN_PUMP 0
+#endif
+
+#if LL_DEBUG_PIPE_TYPE_IN_PUMP
+#include <typeinfo>
+#endif
+
+// constants for poll timeout. if we are threading, we want to have a
+// longer poll timeout.
+#if LL_THREADS_APR
+static const S32 DEFAULT_POLL_TIMEOUT = 1000;
+#else
+static const S32 DEFAULT_POLL_TIMEOUT = 0;
+#endif
+
+// The default (and fallback) expiration time for chains
+const F32 DEFAULT_CHAIN_EXPIRY_SECS = 30.0f;
+extern const F32 SHORT_CHAIN_EXPIRY_SECS = 1.0f;
+extern const F32 NEVER_CHAIN_EXPIRY_SECS = 0.0f;
+
+// sorta spammy debug modes.
+//#define LL_DEBUG_SPEW_BUFFER_CHANNEL_IN_ON_ERROR 1
+//#define LL_DEBUG_PROCESS_LINK 1
+//#define LL_DEBUG_PROCESS_RETURN_VALUE 1
+
+// Super spammy debug mode.
+//#define LL_DEBUG_SPEW_BUFFER_CHANNEL_IN 1
+//#define LL_DEBUG_SPEW_BUFFER_CHANNEL_OUT 1
+
+/**
+ * @class
+ */
+class LLChainSleeper : public LLRunnable
+{
+public:
+ static LLRunner::run_ptr_t build(LLPumpIO* pump, S32 key)
+ {
+ return LLRunner::run_ptr_t(new LLChainSleeper(pump, key));
+ }
+
+ virtual void run(LLRunner* runner, S64 handle)
+ {
+ mPump->clearLock(mKey);
+ }
+
+protected:
+ LLChainSleeper(LLPumpIO* pump, S32 key) : mPump(pump), mKey(key) {}
+ LLPumpIO* mPump;
+ S32 mKey;
+};
+
+
+/**
+ * @struct ll_delete_apr_pollset_fd_client_data
+ * @brief This is a simple helper class to clean up our client data.
+ */
+struct ll_delete_apr_pollset_fd_client_data
+{
+ typedef std::pair<LLIOPipe::ptr_t, apr_pollfd_t> pipe_conditional_t;
+ void operator()(const pipe_conditional_t& conditional)
+ {
+ LLMemType m1(LLMemType::MTYPE_IO_PUMP);
+ S32* client_id = (S32*)conditional.second.client_data;
+ delete client_id;
+ }
+};
+
+/**
+ * LLPumpIO
+ */
+LLPumpIO::LLPumpIO(apr_pool_t* pool) :
+ mState(LLPumpIO::NORMAL),
+ mRebuildPollset(false),
+ mPollset(NULL),
+ mPollsetClientID(0),
+ mNextLock(0),
+ mPool(NULL),
+ mCurrentPool(NULL),
+ mCurrentPoolReallocCount(0),
+ mChainsMutex(NULL),
+ mCallbackMutex(NULL)
+{
+ LLMemType m1(LLMemType::MTYPE_IO_PUMP);
+ initialize(pool);
+}
+
+LLPumpIO::~LLPumpIO()
+{
+ LLMemType m1(LLMemType::MTYPE_IO_PUMP);
+ cleanup();
+}
+
+bool LLPumpIO::prime(apr_pool_t* pool)
+{
+ LLMemType m1(LLMemType::MTYPE_IO_PUMP);
+ cleanup();
+ initialize(pool);
+ return ((pool == NULL) ? false : true);
+}
+
+bool LLPumpIO::addChain(const chain_t& chain, F32 timeout)
+{
+ LLMemType m1(LLMemType::MTYPE_IO_PUMP);
+ if(chain.empty()) return false;
+
+#if LL_THREADS_APR
+ LLScopedLock lock(mChainsMutex);
+#endif
+ LLChainInfo info;
+ info.setTimeoutSeconds(timeout);
+ info.mData = LLIOPipe::buffer_ptr_t(new LLBufferArray);
+ LLLinkInfo link;
+#if LL_DEBUG_PIPE_TYPE_IN_PUMP
+ lldebugs << "LLPumpIO::addChain() " << chain[0] << " '"
+ << typeid(*(chain[0])).name() << "'" << llendl;
+#else
+ lldebugs << "LLPumpIO::addChain() " << chain[0] <<llendl;
+#endif
+ chain_t::const_iterator it = chain.begin();
+ chain_t::const_iterator end = chain.end();
+ for(; it != end; ++it)
+ {
+ link.mPipe = (*it);
+ link.mChannels = info.mData->nextChannel();
+ info.mChainLinks.push_back(link);
+ }
+ mPendingChains.push_back(info);
+ return true;
+}
+
+bool LLPumpIO::addChain(
+ const LLPumpIO::links_t& links,
+ LLIOPipe::buffer_ptr_t data,
+ LLSD context,
+ F32 timeout)
+{
+ LLMemType m1(LLMemType::MTYPE_IO_PUMP);
+
+ // remember that if the caller is providing a full link
+ // description, we need to have that description matched to a
+ // particular buffer.
+ if(!data) return false;
+ if(links.empty()) return false;
+
+#if LL_THREADS_APR
+ LLScopedLock lock(mChainsMutex);
+#endif
+#if LL_DEBUG_PIPE_TYPE_IN_PUMP
+ lldebugs << "LLPumpIO::addChain() " << links[0].mPipe << " '"
+ << typeid(*(links[0].mPipe)).name() << "'" << llendl;
+#else
+ lldebugs << "LLPumpIO::addChain() " << links[0].mPipe << llendl;
+#endif
+ LLChainInfo info;
+ info.setTimeoutSeconds(timeout);
+ info.mChainLinks = links;
+ info.mData = data;
+ info.mContext = context;
+ mPendingChains.push_back(info);
+ return true;
+}
+
+bool LLPumpIO::setTimeoutSeconds(F32 timeout)
+{
+ // If no chain is running, return failure.
+ if(current_chain_t() == mCurrentChain)
+ {
+ return false;
+ }
+ (*mCurrentChain).setTimeoutSeconds(timeout);
+ return true;
+}
+
+bool LLPumpIO::setConditional(LLIOPipe* pipe, const apr_pollfd_t* poll)
+{
+ LLMemType m1(LLMemType::MTYPE_IO_PUMP);
+ //lldebugs << "LLPumpIO::setConditional" << llendl;
+ if(pipe)
+ {
+ // remove any matching poll file descriptors for this pipe.
+ LLIOPipe::ptr_t pipe_ptr(pipe);
+
+ LLChainInfo::conditionals_t::iterator it = (*mCurrentChain).mDescriptors.begin();
+ LLChainInfo::conditionals_t::iterator end = (*mCurrentChain).mDescriptors.end();
+ while (it != end)
+ {
+ LLChainInfo::pipe_conditional_t& value = (*it);
+ if ( pipe_ptr == value.first )
+ {
+ ll_delete_apr_pollset_fd_client_data()(value);
+ (*mCurrentChain).mDescriptors.erase(it++);
+ mRebuildPollset = true;
+ }
+ else
+ {
+ ++it;
+ }
+ }
+
+ if(poll)
+ {
+ LLChainInfo::pipe_conditional_t value;
+ value.first = pipe_ptr;
+ value.second = *poll;
+ if(!poll->p)
+ {
+ // each fd needs a pool to work with, so if one was
+ // not specified, use this pool.
+ // *FIX: Should it always be this pool?
+ value.second.p = mPool;
+ }
+ value.second.client_data = new S32(++mPollsetClientID);
+ (*mCurrentChain).mDescriptors.push_back(value);
+ mRebuildPollset = true;
+ }
+ return true;
+ }
+ return false;
+}
+
+S32 LLPumpIO::setLock()
+{
+ // *NOTE: I do not think it is necessary to acquire a mutex here
+ // since this should only be called during the pump(), and should
+ // only change the running chain. Any other use of this method is
+ // incorrect usage. If it becomes necessary to acquire a lock
+ // here, be sure to lock here and call a protected method to get
+ // the lock, and sleepChain() should probably acquire the same
+ // lock while and calling the same protected implementation to
+ // lock the runner at the same time.
+
+ // If no chain is running, return failure.
+ if(current_chain_t() == mCurrentChain)
+ {
+ return 0;
+ }
+
+ // deal with wrap.
+ if(++mNextLock <= 0)
+ {
+ mNextLock = 1;
+ }
+
+ // set the lock
+ (*mCurrentChain).mLock = mNextLock;
+ return mNextLock;
+}
+
+void LLPumpIO::clearLock(S32 key)
+{
+ // We need to lock it here since we do not want to be iterating
+ // over the chains twice. We can safely call process() while this
+ // is happening since we should not be erasing a locked pipe, and
+ // therefore won't be treading into deleted memory. I think we can
+ // also clear the lock on the chain safely since the pump only
+ // reads that value.
+#if LL_THREADS_APR
+ LLScopedLock lock(mChainsMutex);
+#endif
+ mClearLocks.insert(key);
+}
+
+bool LLPumpIO::sleepChain(F64 seconds)
+{
+ // Much like the call to setLock(), this should only be called
+ // from one chain during processing, so there is no need to
+ // acquire a mutex.
+ if(seconds <= 0.0) return false;
+ S32 key = setLock();
+ if(!key) return false;
+ LLRunner::run_handle_t handle = mRunner.addRunnable(
+ LLChainSleeper::build(this, key),
+ LLRunner::RUN_IN,
+ seconds);
+ if(0 == handle) return false;
+ return true;
+}
+
+bool LLPumpIO::copyCurrentLinkInfo(links_t& links) const
+{
+ LLMemType m1(LLMemType::MTYPE_IO_PUMP);
+ if(current_chain_t() == mCurrentChain)
+ {
+ return false;
+ }
+ std::copy(
+ (*mCurrentChain).mChainLinks.begin(),
+ (*mCurrentChain).mChainLinks.end(),
+ std::back_insert_iterator<links_t>(links));
+ return true;
+}
+
+void LLPumpIO::pump()
+{
+ LLMemType m1(LLMemType::MTYPE_IO_PUMP);
+ //llinfos << "LLPumpIO::pump()" << llendl;
+
+ // Run any pending runners.
+ mRunner.run();
+
+ // We need to move all of the pending heads over to the running
+ // chains.
+ PUMP_DEBUG;
+ if(true)
+ {
+#if LL_THREADS_APR
+ LLScopedLock lock(mChainsMutex);
+#endif
+ // bail if this pump is paused.
+ if(PAUSING == mState)
+ {
+ mState = PAUSED;
+ }
+ if(PAUSED == mState)
+ {
+ return;
+ }
+
+ PUMP_DEBUG;
+ // Move the pending chains over to the running chaings
+ if(!mPendingChains.empty())
+ {
+ PUMP_DEBUG;
+ //lldebugs << "Pushing " << mPendingChains.size() << "." << llendl;
+ std::copy(
+ mPendingChains.begin(),
+ mPendingChains.end(),
+ std::back_insert_iterator<running_chains_t>(mRunningChains));
+ mPendingChains.clear();
+ PUMP_DEBUG;
+ }
+
+ // Clear any locks. This needs to be done here so that we do
+ // not clash during a call to clearLock().
+ if(!mClearLocks.empty())
+ {
+ PUMP_DEBUG;
+ running_chains_t::iterator it = mRunningChains.begin();
+ running_chains_t::iterator end = mRunningChains.end();
+ std::set<S32>::iterator not_cleared = mClearLocks.end();
+ for(; it != end; ++it)
+ {
+ if((*it).mLock && mClearLocks.find((*it).mLock) != not_cleared)
+ {
+ (*it).mLock = 0;
+ }
+ }
+ PUMP_DEBUG;
+ mClearLocks.clear();
+ }
+ }
+
+ PUMP_DEBUG;
+ // rebuild the pollset if necessary
+ if(mRebuildPollset)
+ {
+ PUMP_DEBUG;
+ rebuildPollset();
+ mRebuildPollset = false;
+ }
+
+ // Poll based on the last known pollset
+ // *FIX: may want to pass in a poll timeout so it works correctly
+ // in single and multi threaded processes.
+ PUMP_DEBUG;
+ typedef std::set<S32> signal_client_t;
+ signal_client_t signalled_client;
+ if(mPollset)
+ {
+ PUMP_DEBUG;
+ //llinfos << "polling" << llendl;
+ S32 count = 0;
+ S32 client_id = 0;
+ const apr_pollfd_t* poll_fd = NULL;
+ apr_pollset_poll(mPollset, DEFAULT_POLL_TIMEOUT, &count, &poll_fd);
+ PUMP_DEBUG;
+ for(S32 i = 0; i < count; ++i)
+ {
+ client_id = *((S32*)poll_fd[i].client_data);
+ signalled_client.insert(client_id);
+ }
+ PUMP_DEBUG;
+ }
+
+ PUMP_DEBUG;
+ // set up for a check to see if each one was signalled
+ signal_client_t::iterator not_signalled = signalled_client.end();
+
+ // Process everything as appropriate
+ //lldebugs << "Running chain count: " << mRunningChains.size() << llendl;
+ running_chains_t::iterator run_chain = mRunningChains.begin();
+ bool process_this_chain = false;
+ for(; run_chain != mRunningChains.end(); )
+ {
+ PUMP_DEBUG;
+ if((*run_chain).mInit
+ && (*run_chain).mTimer.getStarted()
+ && (*run_chain).mTimer.hasExpired())
+ {
+ PUMP_DEBUG;
+ if(handleChainError(*run_chain, LLIOPipe::STATUS_EXPIRED))
+ {
+ // the pipe probably handled the error. If the handler
+ // forgot to reset the expiration then we need to do
+ // that here.
+ if((*run_chain).mTimer.getStarted()
+ && (*run_chain).mTimer.hasExpired())
+ {
+ PUMP_DEBUG;
+ llinfos << "Error handler forgot to reset timeout. "
+ << "Resetting to " << DEFAULT_CHAIN_EXPIRY_SECS
+ << " seconds." << llendl;
+ (*run_chain).setTimeoutSeconds(DEFAULT_CHAIN_EXPIRY_SECS);
+ }
+ }
+ else
+ {
+ PUMP_DEBUG;
+ // it timed out and no one handled it, so we need to
+ // retire the chain
+#if LL_DEBUG_PIPE_TYPE_IN_PUMP
+ lldebugs << "Removing chain "
+ << (*run_chain).mChainLinks[0].mPipe
+ << " '"
+ << typeid(*((*run_chain).mChainLinks[0].mPipe)).name()
+ << "' because it timed out." << llendl;
+#else
+// lldebugs << "Removing chain "
+// << (*run_chain).mChainLinks[0].mPipe
+// << " because we reached the end." << llendl;
+#endif
+ mRunningChains.erase(run_chain++);
+ continue;
+ }
+ }
+ PUMP_DEBUG;
+ if((*run_chain).mLock)
+ {
+ ++run_chain;
+ continue;
+ }
+ PUMP_DEBUG;
+ mCurrentChain = run_chain;
+ if((*run_chain).mDescriptors.empty())
+ {
+ // if there are no conditionals, just process this chain.
+ process_this_chain = true;
+ //lldebugs << "no conditionals - processing" << llendl;
+ }
+ else
+ {
+ PUMP_DEBUG;
+ //lldebugs << "checking conditionals" << llendl;
+ // Check if this run chain was signalled. If any file
+ // descriptor is ready for something, then go ahead and
+ // process this chian.
+ process_this_chain = false;
+ if(!signalled_client.empty())
+ {
+ PUMP_DEBUG;
+ LLChainInfo::conditionals_t::iterator it;
+ it = (*run_chain).mDescriptors.begin();
+ LLChainInfo::conditionals_t::iterator end;
+ end = (*run_chain).mDescriptors.end();
+ S32 client_id = 0;
+ for(; it != end; ++it)
+ {
+ PUMP_DEBUG;
+ client_id = *((S32*)((*it).second.client_data));
+ if(signalled_client.find(client_id) != not_signalled)
+ {
+ process_this_chain = true;
+ break;
+ }
+ //llinfos << "no fd ready for this one." << llendl;
+ }
+ }
+ }
+ if(process_this_chain)
+ {
+ PUMP_DEBUG;
+ if(!((*run_chain).mInit))
+ {
+ (*run_chain).mHead = (*run_chain).mChainLinks.begin();
+ (*run_chain).mInit = true;
+ }
+ PUMP_DEBUG;
+ processChain(*run_chain);
+ }
+
+ PUMP_DEBUG;
+ if((*run_chain).mHead == (*run_chain).mChainLinks.end())
+ {
+#if LL_DEBUG_PIPE_TYPE_IN_PUMP
+ lldebugs << "Removing chain " << (*run_chain).mChainLinks[0].mPipe
+ << " '"
+ << typeid(*((*run_chain).mChainLinks[0].mPipe)).name()
+ << "' because we reached the end." << llendl;
+#else
+// lldebugs << "Removing chain " << (*run_chain).mChainLinks[0].mPipe
+// << " because we reached the end." << llendl;
+#endif
+
+ PUMP_DEBUG;
+ // This chain is done. Clean up any allocated memory and
+ // erase the chain info.
+ std::for_each(
+ (*run_chain).mDescriptors.begin(),
+ (*run_chain).mDescriptors.end(),
+ ll_delete_apr_pollset_fd_client_data());
+ mRunningChains.erase(run_chain++);
+
+ // *NOTE: may not always need to rebuild the pollset.
+ mRebuildPollset = true;
+ }
+ else
+ {
+ PUMP_DEBUG;
+ // this chain needs more processing - just go to the next
+ // chain.
+ ++run_chain;
+ }
+ }
+
+ PUMP_DEBUG;
+ // null out the chain
+ mCurrentChain = current_chain_t();
+ END_PUMP_DEBUG;
+}
+
+//bool LLPumpIO::respond(const chain_t& pipes)
+//{
+//#if LL_THREADS_APR
+// LLScopedLock lock(mCallbackMutex);
+//#endif
+// LLChainInfo info;
+// links_t links;
+//
+// mPendingCallbacks.push_back(info);
+// return true;
+//}
+
+bool LLPumpIO::respond(LLIOPipe* pipe)
+{
+ LLMemType m1(LLMemType::MTYPE_IO_PUMP);
+ if(NULL == pipe) return false;
+
+#if LL_THREADS_APR
+ LLScopedLock lock(mCallbackMutex);
+#endif
+ LLChainInfo info;
+ LLLinkInfo link;
+ link.mPipe = pipe;
+ info.mChainLinks.push_back(link);
+ mPendingCallbacks.push_back(info);
+ return true;
+}
+
+bool LLPumpIO::respond(
+ const links_t& links,
+ LLIOPipe::buffer_ptr_t data,
+ LLSD context)
+{
+ LLMemType m1(LLMemType::MTYPE_IO_PUMP);
+ // if the caller is providing a full link description, we need to
+ // have that description matched to a particular buffer.
+ if(!data) return false;
+ if(links.empty()) return false;
+
+#if LL_THREADS_APR
+ LLScopedLock lock(mCallbackMutex);
+#endif
+
+ // Add the callback response
+ LLChainInfo info;
+ info.mChainLinks = links;
+ info.mData = data;
+ info.mContext = context;
+ mPendingCallbacks.push_back(info);
+ return true;
+}
+
+void LLPumpIO::callback()
+{
+ LLMemType m1(LLMemType::MTYPE_IO_PUMP);
+ //llinfos << "LLPumpIO::callback()" << llendl;
+ if(true)
+ {
+#if LL_THREADS_APR
+ LLScopedLock lock(mCallbackMutex);
+#endif
+ std::copy(
+ mPendingCallbacks.begin(),
+ mPendingCallbacks.end(),
+ std::back_insert_iterator<callbacks_t>(mCallbacks));
+ mPendingCallbacks.clear();
+ }
+ if(!mCallbacks.empty())
+ {
+ callbacks_t::iterator it = mCallbacks.begin();
+ callbacks_t::iterator end = mCallbacks.end();
+ for(; it != end; ++it)
+ {
+ // it's always the first and last time for respone chains
+ (*it).mHead = (*it).mChainLinks.begin();
+ (*it).mInit = true;
+ (*it).mEOS = true;
+ processChain(*it);
+ }
+ mCallbacks.clear();
+ }
+}
+
+void LLPumpIO::control(LLPumpIO::EControl op)
+{
+#if LL_THREADS_APR
+ LLScopedLock lock(mChainsMutex);
+#endif
+ switch(op)
+ {
+ case PAUSE:
+ mState = PAUSING;
+ break;
+ case RESUME:
+ mState = NORMAL;
+ break;
+ default:
+ // no-op
+ break;
+ }
+}
+
+void LLPumpIO::initialize(apr_pool_t* pool)
+{
+ LLMemType m1(LLMemType::MTYPE_IO_PUMP);
+ if(!pool) return;
+#if LL_THREADS_APR
+ apr_thread_mutex_create(&mChainsMutex, APR_THREAD_MUTEX_DEFAULT, pool);
+ apr_thread_mutex_create(&mCallbackMutex, APR_THREAD_MUTEX_DEFAULT, pool);
+#endif
+ mPool = pool;
+}
+
+void LLPumpIO::cleanup()
+{
+ LLMemType m1(LLMemType::MTYPE_IO_PUMP);
+#if LL_THREADS_APR
+ if(mChainsMutex) apr_thread_mutex_destroy(mChainsMutex);
+ if(mCallbackMutex) apr_thread_mutex_destroy(mCallbackMutex);
+#endif
+ mChainsMutex = NULL;
+ mCallbackMutex = NULL;
+ if(mPollset)
+ {
+// lldebugs << "cleaning up pollset" << llendl;
+ apr_pollset_destroy(mPollset);
+ mPollset = NULL;
+ }
+ if(mCurrentPool)
+ {
+ apr_pool_destroy(mCurrentPool);
+ mCurrentPool = NULL;
+ }
+ mPool = NULL;
+}
+
+void LLPumpIO::rebuildPollset()
+{
+ LLMemType m1(LLMemType::MTYPE_IO_PUMP);
+// lldebugs << "LLPumpIO::rebuildPollset()" << llendl;
+ if(mPollset)
+ {
+ //lldebugs << "destroying pollset" << llendl;
+ apr_pollset_destroy(mPollset);
+ mPollset = NULL;
+ }
+ U32 size = 0;
+ running_chains_t::iterator run_it = mRunningChains.begin();
+ running_chains_t::iterator run_end = mRunningChains.end();
+ for(; run_it != run_end; ++run_it)
+ {
+ size += (*run_it).mDescriptors.size();
+ }
+ //lldebugs << "found " << size << " descriptors." << llendl;
+ if(size)
+ {
+ // Recycle the memory pool
+ const S32 POLLSET_POOL_RECYCLE_COUNT = 100;
+ if(mCurrentPool
+ && (0 == (++mCurrentPoolReallocCount % POLLSET_POOL_RECYCLE_COUNT)))
+ {
+ apr_pool_destroy(mCurrentPool);
+ mCurrentPool = NULL;
+ mCurrentPoolReallocCount = 0;
+ }
+ if(!mCurrentPool)
+ {
+ apr_status_t status = apr_pool_create(&mCurrentPool, mPool);
+ (void)ll_apr_warn_status(status);
+ }
+
+ // add all of the file descriptors
+ run_it = mRunningChains.begin();
+ LLChainInfo::conditionals_t::iterator fd_it;
+ LLChainInfo::conditionals_t::iterator fd_end;
+ apr_pollset_create(&mPollset, size, mCurrentPool, 0);
+ for(; run_it != run_end; ++run_it)
+ {
+ fd_it = (*run_it).mDescriptors.begin();
+ fd_end = (*run_it).mDescriptors.end();
+ for(; fd_it != fd_end; ++fd_it)
+ {
+ apr_pollset_add(mPollset, &((*fd_it).second));
+ }
+ }
+ }
+}
+
+void LLPumpIO::processChain(LLChainInfo& chain)
+{
+ PUMP_DEBUG;
+ LLMemType m1(LLMemType::MTYPE_IO_PUMP);
+ LLIOPipe::EStatus status = LLIOPipe::STATUS_OK;
+ links_t::iterator it = chain.mHead;
+ links_t::iterator end = chain.mChainLinks.end();
+ bool need_process_signaled = false;
+ bool keep_going = true;
+ do
+ {
+#if LL_DEBUG_PROCESS_LINK
+#if LL_DEBUG_PIPE_TYPE_IN_PUMP
+ llinfos << "Processing " << typeid(*((*it).mPipe)).name() << "."
+ << llendl;
+#else
+ llinfos << "Processing link " << (*it).mPipe << "." << llendl;
+#endif
+#endif
+#if LL_DEBUG_SPEW_BUFFER_CHANNEL_IN
+ if(chain.mData)
+ {
+ char* buf = NULL;
+ S32 bytes = chain.mData->countAfter((*it).mChannels.in(), NULL);
+ if(bytes)
+ {
+ buf = new char[bytes + 1];
+ chain.mData->readAfter(
+ (*it).mChannels.in(),
+ NULL,
+ (U8*)buf,
+ bytes);
+ buf[bytes] = '\0';
+ llinfos << "CHANNEL IN(" << (*it).mChannels.in() << "): "
+ << buf << llendl;
+ delete[] buf;
+ buf = NULL;
+ }
+ else
+ {
+ llinfos << "CHANNEL IN(" << (*it).mChannels.in()<< "): (null)"
+ << llendl;
+ }
+ }
+#endif
+ PUMP_DEBUG;
+ status = (*it).mPipe->process(
+ (*it).mChannels,
+ chain.mData,
+ chain.mEOS,
+ chain.mContext,
+ this);
+#if LL_DEBUG_SPEW_BUFFER_CHANNEL_OUT
+ if(chain.mData)
+ {
+ char* buf = NULL;
+ S32 bytes = chain.mData->countAfter((*it).mChannels.out(), NULL);
+ if(bytes)
+ {
+ buf = new char[bytes + 1];
+ chain.mData->readAfter(
+ (*it).mChannels.out(),
+ NULL,
+ (U8*)buf,
+ bytes);
+ buf[bytes] = '\0';
+ llinfos << "CHANNEL OUT(" << (*it).mChannels.out()<< "): "
+ << buf << llendl;
+ delete[] buf;
+ buf = NULL;
+ }
+ else
+ {
+ llinfos << "CHANNEL OUT(" << (*it).mChannels.out()<< "): (null)"
+ << llendl;
+ }
+ }
+#endif
+
+#if LL_DEBUG_PROCESS_RETURN_VALUE
+ // Only bother with the success codes - error codes are logged
+ // below.
+ if(LLIOPipe::isSuccess(status))
+ {
+ llinfos << "Pipe returned: '"
+#if LL_DEBUG_PIPE_TYPE_IN_PUMP
+ << typeid(*((*it).mPipe)).name() << "':'"
+#endif
+ << LLIOPipe::lookupStatusString(status) << "'" << llendl;
+ }
+#endif
+
+ PUMP_DEBUG;
+ switch(status)
+ {
+ case LLIOPipe::STATUS_OK:
+ // no-op
+ break;
+ case LLIOPipe::STATUS_STOP:
+ PUMP_DEBUG;
+ status = LLIOPipe::STATUS_OK;
+ chain.mHead = end;
+ keep_going = false;
+ break;
+ case LLIOPipe::STATUS_DONE:
+ PUMP_DEBUG;
+ status = LLIOPipe::STATUS_OK;
+ chain.mHead = (it + 1);
+ chain.mEOS = true;
+ break;
+ case LLIOPipe::STATUS_BREAK:
+ PUMP_DEBUG;
+ status = LLIOPipe::STATUS_OK;
+ keep_going = false;
+ break;
+ case LLIOPipe::STATUS_NEED_PROCESS:
+ PUMP_DEBUG;
+ status = LLIOPipe::STATUS_OK;
+ if(!need_process_signaled)
+ {
+ need_process_signaled = true;
+ chain.mHead = it;
+ }
+ break;
+ default:
+ PUMP_DEBUG;
+ if(LLIOPipe::isError(status))
+ {
+ llinfos << "Pump generated pipe error: '"
+#if LL_DEBUG_PIPE_TYPE_IN_PUMP
+ << typeid(*((*it).mPipe)).name() << "':'"
+#endif
+ << LLIOPipe::lookupStatusString(status)
+ << "'" << llendl;
+#if LL_DEBUG_SPEW_BUFFER_CHANNEL_IN_ON_ERROR
+ if(chain.mData)
+ {
+ char* buf = NULL;
+ S32 bytes = chain.mData->countAfter(
+ (*it).mChannels.in(),
+ NULL);
+ if(bytes)
+ {
+ buf = new char[bytes + 1];
+ chain.mData->readAfter(
+ (*it).mChannels.in(),
+ NULL,
+ (U8*)buf,
+ bytes);
+ buf[bytes] = '\0';
+ llinfos << "Input After Error: " << buf << llendl;
+ delete[] buf;
+ buf = NULL;
+ }
+ else
+ {
+ llinfos << "Input After Error: (null)" << llendl;
+ }
+ }
+ else
+ {
+ llinfos << "Input After Error: (null)" << llendl;
+ }
+#endif
+ keep_going = false;
+ chain.mHead = it;
+ if(!handleChainError(chain, status))
+ {
+ chain.mHead = end;
+ }
+ }
+ else
+ {
+ llinfos << "Unhandled status code: " << status << ":"
+ << LLIOPipe::lookupStatusString(status) << llendl;
+ }
+ break;
+ }
+ PUMP_DEBUG;
+ } while(keep_going && (++it != end));
+ PUMP_DEBUG;
+}
+
+bool LLPumpIO::handleChainError(
+ LLChainInfo& chain,
+ LLIOPipe::EStatus error)
+{
+ LLMemType m1(LLMemType::MTYPE_IO_PUMP);
+ links_t::reverse_iterator rit;
+ if(chain.mHead == chain.mChainLinks.end())
+ {
+ rit = links_t::reverse_iterator(chain.mHead);
+ }
+ else
+ {
+ rit = links_t::reverse_iterator(chain.mHead + 1);
+ }
+
+ links_t::reverse_iterator rend = chain.mChainLinks.rend();
+ bool handled = false;
+ bool keep_going = true;
+ do
+ {
+#if LL_DEBUG_PIPE_TYPE_IN_PUMP
+ lldebugs << "Passing error to " << typeid(*((*rit).mPipe)).name()
+ << "." << llendl;
+#endif
+ error = (*rit).mPipe->handleError(error, this);
+ switch(error)
+ {
+ case LLIOPipe::STATUS_OK:
+ handled = true;
+ chain.mHead = rit.base();
+ break;
+ case LLIOPipe::STATUS_STOP:
+ case LLIOPipe::STATUS_DONE:
+ case LLIOPipe::STATUS_BREAK:
+ case LLIOPipe::STATUS_NEED_PROCESS:
+#if LL_DEBUG_PIPE_TYPE_IN_PUMP
+ lldebugs << "Pipe " << typeid(*((*rit).mPipe)).name()
+ << " returned code to stop error handler." << llendl;
+#endif
+ keep_going = false;
+ break;
+ default:
+ if(LLIOPipe::isSuccess(error))
+ {
+ llinfos << "Unhandled status code: " << error << ":"
+ << LLIOPipe::lookupStatusString(error) << llendl;
+ error = LLIOPipe::STATUS_ERROR;
+ keep_going = false;
+ }
+ break;
+ }
+ } while(keep_going && !handled && (++rit != rend));
+ return handled;
+}
+
+/**
+ * LLPumpIO::LLChainInfo
+ */
+
+LLPumpIO::LLChainInfo::LLChainInfo() :
+ mInit(false),
+ mLock(0),
+ mEOS(false)
+{
+ LLMemType m1(LLMemType::MTYPE_IO_PUMP);
+ mTimer.setTimerExpirySec(DEFAULT_CHAIN_EXPIRY_SECS);
+}
+
+void LLPumpIO::LLChainInfo::setTimeoutSeconds(F32 timeout)
+{
+ LLMemType m1(LLMemType::MTYPE_IO_PUMP);
+ if(timeout > 0.0f)
+ {
+ mTimer.start();
+ mTimer.reset();
+ mTimer.setTimerExpirySec(timeout);
+ }
+ else
+ {
+ mTimer.stop();
+ }
+}