diff options
Diffstat (limited to 'indra/llmessage/llpumpio.cpp')
-rw-r--r-- | indra/llmessage/llpumpio.cpp | 1760 |
1 files changed, 880 insertions, 880 deletions
diff --git a/indra/llmessage/llpumpio.cpp b/indra/llmessage/llpumpio.cpp index 44720f0015..ec0d7fe4e4 100644 --- a/indra/llmessage/llpumpio.cpp +++ b/indra/llmessage/llpumpio.cpp @@ -1,4 +1,4 @@ -/** +/** * @file llpumpio.cpp * @author Phoenix * @date 2004-11-21 @@ -7,21 +7,21 @@ * $LicenseInfo:firstyear=2004&license=viewerlgpl$ * Second Life Viewer Source Code * Copyright (C) 2010, Linden Research, Inc. - * + * * This library is free software; you can redistribute it and/or * modify it under the terms of the GNU Lesser General Public * License as published by the Free Software Foundation; * version 2.1 of the License only. - * + * * This library is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU * Lesser General Public License for more details. - * + * * You should have received a copy of the GNU Lesser General Public * License along with this library; if not, write to the Free Software * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA - * + * * Linden Research, Inc., 945 Battery Street, San Francisco, CA 94111 USA * $/LicenseInfo$ */ @@ -76,44 +76,44 @@ extern const F32 NEVER_CHAIN_EXPIRY_SECS = 0.0f; void ll_debug_poll_fd(const char* msg, const apr_pollfd_t* poll) { #if LL_DEBUG_POLL_FILE_DESCRIPTORS - if(!poll) - { - LL_DEBUGS() << "Poll -- " << (msg?msg:"") << ": no pollfd." << LL_ENDL; - return; - } - if(poll->desc.s) - { - apr_os_sock_t os_sock; - if(APR_SUCCESS == apr_os_sock_get(&os_sock, poll->desc.s)) - { - LL_DEBUGS() << "Poll -- " << (msg?msg:"") << " on fd " << os_sock - << " at " << poll->desc.s << LL_ENDL; - } - else - { - LL_DEBUGS() << "Poll -- " << (msg?msg:"") << " no fd " - << " at " << poll->desc.s << LL_ENDL; - } - } - else if(poll->desc.f) - { - apr_os_file_t os_file; - if(APR_SUCCESS == apr_os_file_get(&os_file, poll->desc.f)) - { - LL_DEBUGS() << "Poll -- " << (msg?msg:"") << " on fd " << os_file - << " at " << poll->desc.f << LL_ENDL; - } - else - { - LL_DEBUGS() << "Poll -- " << (msg?msg:"") << " no fd " - << " at " << poll->desc.f << LL_ENDL; - } - } - else - { - LL_DEBUGS() << "Poll -- " << (msg?msg:"") << ": no descriptor." << LL_ENDL; - } -#endif + if(!poll) + { + LL_DEBUGS() << "Poll -- " << (msg?msg:"") << ": no pollfd." << LL_ENDL; + return; + } + if(poll->desc.s) + { + apr_os_sock_t os_sock; + if(APR_SUCCESS == apr_os_sock_get(&os_sock, poll->desc.s)) + { + LL_DEBUGS() << "Poll -- " << (msg?msg:"") << " on fd " << os_sock + << " at " << poll->desc.s << LL_ENDL; + } + else + { + LL_DEBUGS() << "Poll -- " << (msg?msg:"") << " no fd " + << " at " << poll->desc.s << LL_ENDL; + } + } + else if(poll->desc.f) + { + apr_os_file_t os_file; + if(APR_SUCCESS == apr_os_file_get(&os_file, poll->desc.f)) + { + LL_DEBUGS() << "Poll -- " << (msg?msg:"") << " on fd " << os_file + << " at " << poll->desc.f << LL_ENDL; + } + else + { + LL_DEBUGS() << "Poll -- " << (msg?msg:"") << " no fd " + << " at " << poll->desc.f << LL_ENDL; + } + } + else + { + LL_DEBUGS() << "Poll -- " << (msg?msg:"") << ": no descriptor." << LL_ENDL; + } +#endif } /** @@ -122,20 +122,20 @@ void ll_debug_poll_fd(const char* msg, const apr_pollfd_t* poll) class LLChainSleeper : public LLRunnable { public: - static LLRunner::run_ptr_t build(LLPumpIO* pump, S32 key) - { - return LLRunner::run_ptr_t(new LLChainSleeper(pump, key)); - } - - virtual void run(LLRunner* runner, S64 handle) - { - mPump->clearLock(mKey); - } + static LLRunner::run_ptr_t build(LLPumpIO* pump, S32 key) + { + return LLRunner::run_ptr_t(new LLChainSleeper(pump, key)); + } + + virtual void run(LLRunner* runner, S64 handle) + { + mPump->clearLock(mKey); + } protected: - LLChainSleeper(LLPumpIO* pump, S32 key) : mPump(pump), mKey(key) {} - LLPumpIO* mPump; - S32 mKey; + LLChainSleeper(LLPumpIO* pump, S32 key) : mPump(pump), mKey(key) {} + LLPumpIO* mPump; + S32 mKey; }; @@ -145,967 +145,967 @@ protected: */ struct ll_delete_apr_pollset_fd_client_data { - typedef std::pair<LLIOPipe::ptr_t, apr_pollfd_t> pipe_conditional_t; - void operator()(const pipe_conditional_t& conditional) - { - S32* client_id = (S32*)conditional.second.client_data; - delete client_id; - } + typedef std::pair<LLIOPipe::ptr_t, apr_pollfd_t> pipe_conditional_t; + void operator()(const pipe_conditional_t& conditional) + { + S32* client_id = (S32*)conditional.second.client_data; + delete client_id; + } }; /** * LLPumpIO */ LLPumpIO::LLPumpIO(apr_pool_t* pool) : - mState(LLPumpIO::NORMAL), - mRebuildPollset(false), - mPollset(NULL), - mPollsetClientID(0), - mNextLock(0), - mPool(NULL), - mCurrentPool(NULL), - mCurrentPoolReallocCount(0), - mCurrentChain(mRunningChains.end()) + mState(LLPumpIO::NORMAL), + mRebuildPollset(false), + mPollset(NULL), + mPollsetClientID(0), + mNextLock(0), + mPool(NULL), + mCurrentPool(NULL), + mCurrentPoolReallocCount(0), + mCurrentChain(mRunningChains.end()) { - mCurrentChain = mRunningChains.end(); + mCurrentChain = mRunningChains.end(); - initialize(pool); + initialize(pool); } LLPumpIO::~LLPumpIO() { - cleanup(); + cleanup(); } bool LLPumpIO::prime(apr_pool_t* pool) { - cleanup(); - initialize(pool); - return ((pool == NULL) ? false : true); + cleanup(); + initialize(pool); + return ((pool == NULL) ? false : true); } bool LLPumpIO::addChain(const chain_t& chain, F32 timeout, bool has_curl_request) { - if(chain.empty()) return false; - - LLChainInfo info; - info.mHasCurlRequest = has_curl_request; - info.setTimeoutSeconds(timeout); - info.mData = LLIOPipe::buffer_ptr_t(new LLBufferArray); - info.mData->setThreaded(has_curl_request); - LLLinkInfo link; + if(chain.empty()) return false; + + LLChainInfo info; + info.mHasCurlRequest = has_curl_request; + info.setTimeoutSeconds(timeout); + info.mData = LLIOPipe::buffer_ptr_t(new LLBufferArray); + info.mData->setThreaded(has_curl_request); + LLLinkInfo link; #if LL_DEBUG_PIPE_TYPE_IN_PUMP - LL_DEBUGS() << "LLPumpIO::addChain() " << chain[0] << " '" - << typeid(*(chain[0])).name() << "'" << LL_ENDL; + LL_DEBUGS() << "LLPumpIO::addChain() " << chain[0] << " '" + << typeid(*(chain[0])).name() << "'" << LL_ENDL; #else - LL_DEBUGS() << "LLPumpIO::addChain() " << chain[0] <<LL_ENDL; + LL_DEBUGS() << "LLPumpIO::addChain() " << chain[0] <<LL_ENDL; #endif - chain_t::const_iterator it = chain.begin(); - chain_t::const_iterator end = chain.end(); - for(; it != end; ++it) - { - link.mPipe = (*it); - link.mChannels = info.mData->nextChannel(); - info.mChainLinks.push_back(link); - } - mPendingChains.push_back(info); - return true; + chain_t::const_iterator it = chain.begin(); + chain_t::const_iterator end = chain.end(); + for(; it != end; ++it) + { + link.mPipe = (*it); + link.mChannels = info.mData->nextChannel(); + info.mChainLinks.push_back(link); + } + mPendingChains.push_back(info); + return true; } bool LLPumpIO::addChain( - const LLPumpIO::links_t& links, - LLIOPipe::buffer_ptr_t data, - LLSD context, - F32 timeout) + const LLPumpIO::links_t& links, + LLIOPipe::buffer_ptr_t data, + LLSD context, + F32 timeout) { - // remember that if the caller is providing a full link - // description, we need to have that description matched to a - // particular buffer. - if(!data) return false; - if(links.empty()) return false; + // remember that if the caller is providing a full link + // description, we need to have that description matched to a + // particular buffer. + if(!data) return false; + if(links.empty()) return false; #if LL_DEBUG_PIPE_TYPE_IN_PUMP - LL_DEBUGS() << "LLPumpIO::addChain() " << links[0].mPipe << " '" - << typeid(*(links[0].mPipe)).name() << "'" << LL_ENDL; + LL_DEBUGS() << "LLPumpIO::addChain() " << links[0].mPipe << " '" + << typeid(*(links[0].mPipe)).name() << "'" << LL_ENDL; #else - LL_DEBUGS() << "LLPumpIO::addChain() " << links[0].mPipe << LL_ENDL; + LL_DEBUGS() << "LLPumpIO::addChain() " << links[0].mPipe << LL_ENDL; #endif - LLChainInfo info; - info.setTimeoutSeconds(timeout); - info.mChainLinks = links; - info.mData = data; - info.mContext = context; - mPendingChains.push_back(info); - return true; + LLChainInfo info; + info.setTimeoutSeconds(timeout); + info.mChainLinks = links; + info.mData = data; + info.mContext = context; + mPendingChains.push_back(info); + return true; } bool LLPumpIO::setTimeoutSeconds(F32 timeout) { - // If no chain is running, return failure. - if(mRunningChains.end() == mCurrentChain) - { - return false; - } - (*mCurrentChain).setTimeoutSeconds(timeout); - return true; + // If no chain is running, return failure. + if(mRunningChains.end() == mCurrentChain) + { + return false; + } + (*mCurrentChain).setTimeoutSeconds(timeout); + return true; } void LLPumpIO::adjustTimeoutSeconds(F32 delta) { - // Ensure a chain is running - if(mRunningChains.end() != mCurrentChain) - { - (*mCurrentChain).adjustTimeoutSeconds(delta); - } + // Ensure a chain is running + if(mRunningChains.end() != mCurrentChain) + { + (*mCurrentChain).adjustTimeoutSeconds(delta); + } } static std::string events_2_string(apr_int16_t events) { - std::ostringstream ostr; - if(events & APR_POLLIN) - { - ostr << "read,"; - } - if(events & APR_POLLPRI) - { - ostr << "priority,"; - } - if(events & APR_POLLOUT) - { - ostr << "write,"; - } - if(events & APR_POLLERR) - { - ostr << "error,"; - } - if(events & APR_POLLHUP) - { - ostr << "hangup,"; - } - if(events & APR_POLLNVAL) - { - ostr << "invalid,"; - } - return chop_tail_copy(ostr.str(), 1); + std::ostringstream ostr; + if(events & APR_POLLIN) + { + ostr << "read,"; + } + if(events & APR_POLLPRI) + { + ostr << "priority,"; + } + if(events & APR_POLLOUT) + { + ostr << "write,"; + } + if(events & APR_POLLERR) + { + ostr << "error,"; + } + if(events & APR_POLLHUP) + { + ostr << "hangup,"; + } + if(events & APR_POLLNVAL) + { + ostr << "invalid,"; + } + return chop_tail_copy(ostr.str(), 1); } bool LLPumpIO::setConditional(LLIOPipe* pipe, const apr_pollfd_t* poll) { - if(!pipe) return false; - ll_debug_poll_fd("Set conditional", poll); + if(!pipe) return false; + ll_debug_poll_fd("Set conditional", poll); - LL_DEBUGS() << "Setting conditionals (" << (poll ? events_2_string(poll->reqevents) :"null") - << ") " + LL_DEBUGS() << "Setting conditionals (" << (poll ? events_2_string(poll->reqevents) :"null") + << ") " #if LL_DEBUG_PIPE_TYPE_IN_PUMP - << "on pipe " << typeid(*pipe).name() + << "on pipe " << typeid(*pipe).name() #endif - << " at " << pipe << LL_ENDL; - - // remove any matching poll file descriptors for this pipe. - LLIOPipe::ptr_t pipe_ptr(pipe); - LLChainInfo::conditionals_t::iterator it; - it = (*mCurrentChain).mDescriptors.begin(); - while(it != (*mCurrentChain).mDescriptors.end()) - { - LLChainInfo::pipe_conditional_t& value = (*it); - if(pipe_ptr == value.first) - { - ll_delete_apr_pollset_fd_client_data()(value); - it = (*mCurrentChain).mDescriptors.erase(it); - mRebuildPollset = true; - } - else - { - ++it; - } - } - - if(!poll) - { - mRebuildPollset = true; - return true; - } - LLChainInfo::pipe_conditional_t value; - value.first = pipe_ptr; - value.second = *poll; - value.second.rtnevents = 0; - if(!poll->p) - { - // each fd needs a pool to work with, so if one was - // not specified, use this pool. - // *FIX: Should it always be this pool? - value.second.p = mPool; - } - value.second.client_data = new S32(++mPollsetClientID); - (*mCurrentChain).mDescriptors.push_back(value); - mRebuildPollset = true; - return true; + << " at " << pipe << LL_ENDL; + + // remove any matching poll file descriptors for this pipe. + LLIOPipe::ptr_t pipe_ptr(pipe); + LLChainInfo::conditionals_t::iterator it; + it = (*mCurrentChain).mDescriptors.begin(); + while(it != (*mCurrentChain).mDescriptors.end()) + { + LLChainInfo::pipe_conditional_t& value = (*it); + if(pipe_ptr == value.first) + { + ll_delete_apr_pollset_fd_client_data()(value); + it = (*mCurrentChain).mDescriptors.erase(it); + mRebuildPollset = true; + } + else + { + ++it; + } + } + + if(!poll) + { + mRebuildPollset = true; + return true; + } + LLChainInfo::pipe_conditional_t value; + value.first = pipe_ptr; + value.second = *poll; + value.second.rtnevents = 0; + if(!poll->p) + { + // each fd needs a pool to work with, so if one was + // not specified, use this pool. + // *FIX: Should it always be this pool? + value.second.p = mPool; + } + value.second.client_data = new S32(++mPollsetClientID); + (*mCurrentChain).mDescriptors.push_back(value); + mRebuildPollset = true; + return true; } S32 LLPumpIO::setLock() { - // *NOTE: I do not think it is necessary to acquire a mutex here - // since this should only be called during the pump(), and should - // only change the running chain. Any other use of this method is - // incorrect usage. If it becomes necessary to acquire a lock - // here, be sure to lock here and call a protected method to get - // the lock, and sleepChain() should probably acquire the same - // lock while and calling the same protected implementation to - // lock the runner at the same time. - - // If no chain is running, return failure. - if(mRunningChains.end() == mCurrentChain) - { - return 0; - } - - // deal with wrap. - if(++mNextLock <= 0) - { - mNextLock = 1; - } - - // set the lock - (*mCurrentChain).mLock = mNextLock; - return mNextLock; + // *NOTE: I do not think it is necessary to acquire a mutex here + // since this should only be called during the pump(), and should + // only change the running chain. Any other use of this method is + // incorrect usage. If it becomes necessary to acquire a lock + // here, be sure to lock here and call a protected method to get + // the lock, and sleepChain() should probably acquire the same + // lock while and calling the same protected implementation to + // lock the runner at the same time. + + // If no chain is running, return failure. + if(mRunningChains.end() == mCurrentChain) + { + return 0; + } + + // deal with wrap. + if(++mNextLock <= 0) + { + mNextLock = 1; + } + + // set the lock + (*mCurrentChain).mLock = mNextLock; + return mNextLock; } void LLPumpIO::clearLock(S32 key) { - // We need to lock it here since we do not want to be iterating - // over the chains twice. We can safely call process() while this - // is happening since we should not be erasing a locked pipe, and - // therefore won't be treading into deleted memory. I think we can - // also clear the lock on the chain safely since the pump only - // reads that value. - mClearLocks.insert(key); + // We need to lock it here since we do not want to be iterating + // over the chains twice. We can safely call process() while this + // is happening since we should not be erasing a locked pipe, and + // therefore won't be treading into deleted memory. I think we can + // also clear the lock on the chain safely since the pump only + // reads that value. + mClearLocks.insert(key); } bool LLPumpIO::sleepChain(F64 seconds) { - // Much like the call to setLock(), this should only be called - // from one chain during processing, so there is no need to - // acquire a mutex. - if(seconds <= 0.0) return false; - S32 key = setLock(); - if(!key) return false; - LLRunner::run_handle_t handle = mRunner.addRunnable( - LLChainSleeper::build(this, key), - LLRunner::RUN_IN, - seconds); - if(0 == handle) return false; - return true; + // Much like the call to setLock(), this should only be called + // from one chain during processing, so there is no need to + // acquire a mutex. + if(seconds <= 0.0) return false; + S32 key = setLock(); + if(!key) return false; + LLRunner::run_handle_t handle = mRunner.addRunnable( + LLChainSleeper::build(this, key), + LLRunner::RUN_IN, + seconds); + if(0 == handle) return false; + return true; } bool LLPumpIO::copyCurrentLinkInfo(links_t& links) const { - if(mRunningChains.end() == mCurrentChain) - { - return false; - } - std::copy( - (*mCurrentChain).mChainLinks.begin(), - (*mCurrentChain).mChainLinks.end(), - std::back_insert_iterator<links_t>(links)); - return true; + if(mRunningChains.end() == mCurrentChain) + { + return false; + } + std::copy( + (*mCurrentChain).mChainLinks.begin(), + (*mCurrentChain).mChainLinks.end(), + std::back_insert_iterator<links_t>(links)); + return true; } void LLPumpIO::pump() { - pump(DEFAULT_POLL_TIMEOUT); + pump(DEFAULT_POLL_TIMEOUT); } -LLPumpIO::current_chain_t LLPumpIO::removeRunningChain(LLPumpIO::current_chain_t& run_chain) +LLPumpIO::current_chain_t LLPumpIO::removeRunningChain(LLPumpIO::current_chain_t& run_chain) { - std::for_each( - (*run_chain).mDescriptors.begin(), - (*run_chain).mDescriptors.end(), - ll_delete_apr_pollset_fd_client_data()); - return mRunningChains.erase(run_chain); + std::for_each( + (*run_chain).mDescriptors.begin(), + (*run_chain).mDescriptors.end(), + ll_delete_apr_pollset_fd_client_data()); + return mRunningChains.erase(run_chain); } //timeout is in microseconds void LLPumpIO::pump(const S32& poll_timeout) { LL_PROFILE_ZONE_SCOPED_CATEGORY_NETWORK; - //LL_INFOS() << "LLPumpIO::pump()" << LL_ENDL; - - // Run any pending runners. - mRunner.run(); - - // We need to move all of the pending heads over to the running - // chains. - PUMP_DEBUG; - if(true) - { - // bail if this pump is paused. - if(PAUSING == mState) - { - mState = PAUSED; - } - if(PAUSED == mState) - { - return; - } - - PUMP_DEBUG; - // Move the pending chains over to the running chaings - if(!mPendingChains.empty()) - { - PUMP_DEBUG; - //LL_DEBUGS() << "Pushing " << mPendingChains.size() << "." << LL_ENDL; - std::copy( - mPendingChains.begin(), - mPendingChains.end(), - std::back_insert_iterator<running_chains_t>(mRunningChains)); - mPendingChains.clear(); - PUMP_DEBUG; - } - - // Clear any locks. This needs to be done here so that we do - // not clash during a call to clearLock(). - if(!mClearLocks.empty()) - { - PUMP_DEBUG; - running_chains_t::iterator it = mRunningChains.begin(); - running_chains_t::iterator end = mRunningChains.end(); - std::set<S32>::iterator not_cleared = mClearLocks.end(); - for(; it != end; ++it) - { - if((*it).mLock && mClearLocks.find((*it).mLock) != not_cleared) - { - (*it).mLock = 0; - } - } - PUMP_DEBUG; - mClearLocks.clear(); - } - } - - PUMP_DEBUG; - // rebuild the pollset if necessary - if(mRebuildPollset) - { - PUMP_DEBUG; - rebuildPollset(); - mRebuildPollset = false; - } - - // Poll based on the last known pollset - // *TODO: may want to pass in a poll timeout so it works correctly - // in single and multi threaded processes. - PUMP_DEBUG; - typedef std::map<S32, S32> signal_client_t; - signal_client_t signalled_client; - const apr_pollfd_t* poll_fd = NULL; - if(mPollset) - { - PUMP_DEBUG; - //LL_INFOS() << "polling" << LL_ENDL; - S32 count = 0; - S32 client_id = 0; + //LL_INFOS() << "LLPumpIO::pump()" << LL_ENDL; + + // Run any pending runners. + mRunner.run(); + + // We need to move all of the pending heads over to the running + // chains. + PUMP_DEBUG; + if(true) + { + // bail if this pump is paused. + if(PAUSING == mState) + { + mState = PAUSED; + } + if(PAUSED == mState) + { + return; + } + + PUMP_DEBUG; + // Move the pending chains over to the running chaings + if(!mPendingChains.empty()) + { + PUMP_DEBUG; + //LL_DEBUGS() << "Pushing " << mPendingChains.size() << "." << LL_ENDL; + std::copy( + mPendingChains.begin(), + mPendingChains.end(), + std::back_insert_iterator<running_chains_t>(mRunningChains)); + mPendingChains.clear(); + PUMP_DEBUG; + } + + // Clear any locks. This needs to be done here so that we do + // not clash during a call to clearLock(). + if(!mClearLocks.empty()) + { + PUMP_DEBUG; + running_chains_t::iterator it = mRunningChains.begin(); + running_chains_t::iterator end = mRunningChains.end(); + std::set<S32>::iterator not_cleared = mClearLocks.end(); + for(; it != end; ++it) + { + if((*it).mLock && mClearLocks.find((*it).mLock) != not_cleared) + { + (*it).mLock = 0; + } + } + PUMP_DEBUG; + mClearLocks.clear(); + } + } + + PUMP_DEBUG; + // rebuild the pollset if necessary + if(mRebuildPollset) + { + PUMP_DEBUG; + rebuildPollset(); + mRebuildPollset = false; + } + + // Poll based on the last known pollset + // *TODO: may want to pass in a poll timeout so it works correctly + // in single and multi threaded processes. + PUMP_DEBUG; + typedef std::map<S32, S32> signal_client_t; + signal_client_t signalled_client; + const apr_pollfd_t* poll_fd = NULL; + if(mPollset) + { + PUMP_DEBUG; + //LL_INFOS() << "polling" << LL_ENDL; + S32 count = 0; + S32 client_id = 0; { LL_PROFILE_ZONE_SCOPED_CATEGORY_NETWORK; apr_pollset_poll(mPollset, poll_timeout, &count, &poll_fd); } - PUMP_DEBUG; - for(S32 ii = 0; ii < count; ++ii) - { - ll_debug_poll_fd("Signalled pipe", &poll_fd[ii]); - client_id = *((S32*)poll_fd[ii].client_data); - signalled_client[client_id] = ii; - } - PUMP_DEBUG; - } - - PUMP_DEBUG; - // set up for a check to see if each one was signalled - signal_client_t::iterator not_signalled = signalled_client.end(); - - // Process everything as appropriate - //LL_DEBUGS() << "Running chain count: " << mRunningChains.size() << LL_ENDL; - running_chains_t::iterator run_chain = mRunningChains.begin(); - bool process_this_chain = false; - while( run_chain != mRunningChains.end() ) - { - PUMP_DEBUG; - if((*run_chain).mInit - && (*run_chain).mTimer.getStarted() - && (*run_chain).mTimer.hasExpired()) - { - PUMP_DEBUG; - if(handleChainError(*run_chain, LLIOPipe::STATUS_EXPIRED)) - { - // the pipe probably handled the error. If the handler - // forgot to reset the expiration then we need to do - // that here. - if((*run_chain).mTimer.getStarted() - && (*run_chain).mTimer.hasExpired()) - { - PUMP_DEBUG; - LL_INFOS() << "Error handler forgot to reset timeout. " - << "Resetting to " << DEFAULT_CHAIN_EXPIRY_SECS - << " seconds." << LL_ENDL; - (*run_chain).setTimeoutSeconds(DEFAULT_CHAIN_EXPIRY_SECS); - } - } - else - { - PUMP_DEBUG; - // it timed out and no one handled it, so we need to - // retire the chain + PUMP_DEBUG; + for(S32 ii = 0; ii < count; ++ii) + { + ll_debug_poll_fd("Signalled pipe", &poll_fd[ii]); + client_id = *((S32*)poll_fd[ii].client_data); + signalled_client[client_id] = ii; + } + PUMP_DEBUG; + } + + PUMP_DEBUG; + // set up for a check to see if each one was signalled + signal_client_t::iterator not_signalled = signalled_client.end(); + + // Process everything as appropriate + //LL_DEBUGS() << "Running chain count: " << mRunningChains.size() << LL_ENDL; + running_chains_t::iterator run_chain = mRunningChains.begin(); + bool process_this_chain = false; + while( run_chain != mRunningChains.end() ) + { + PUMP_DEBUG; + if((*run_chain).mInit + && (*run_chain).mTimer.getStarted() + && (*run_chain).mTimer.hasExpired()) + { + PUMP_DEBUG; + if(handleChainError(*run_chain, LLIOPipe::STATUS_EXPIRED)) + { + // the pipe probably handled the error. If the handler + // forgot to reset the expiration then we need to do + // that here. + if((*run_chain).mTimer.getStarted() + && (*run_chain).mTimer.hasExpired()) + { + PUMP_DEBUG; + LL_INFOS() << "Error handler forgot to reset timeout. " + << "Resetting to " << DEFAULT_CHAIN_EXPIRY_SECS + << " seconds." << LL_ENDL; + (*run_chain).setTimeoutSeconds(DEFAULT_CHAIN_EXPIRY_SECS); + } + } + else + { + PUMP_DEBUG; + // it timed out and no one handled it, so we need to + // retire the chain #if LL_DEBUG_PIPE_TYPE_IN_PUMP - LL_DEBUGS() << "Removing chain " - << (*run_chain).mChainLinks[0].mPipe - << " '" - << typeid(*((*run_chain).mChainLinks[0].mPipe)).name() - << "' because it timed out." << LL_ENDL; + LL_DEBUGS() << "Removing chain " + << (*run_chain).mChainLinks[0].mPipe + << " '" + << typeid(*((*run_chain).mChainLinks[0].mPipe)).name() + << "' because it timed out." << LL_ENDL; #else -// LL_DEBUGS() << "Removing chain " -// << (*run_chain).mChainLinks[0].mPipe -// << " because we reached the end." << LL_ENDL; +// LL_DEBUGS() << "Removing chain " +// << (*run_chain).mChainLinks[0].mPipe +// << " because we reached the end." << LL_ENDL; #endif - run_chain = removeRunningChain(run_chain); - continue; - } - } - else if(isChainExpired(*run_chain)) - { - run_chain = removeRunningChain(run_chain); - continue; - } - - PUMP_DEBUG; - if((*run_chain).mLock) - { - ++run_chain; - continue; - } - PUMP_DEBUG; - mCurrentChain = run_chain; - - if((*run_chain).mDescriptors.empty()) - { - // if there are no conditionals, just process this chain. - process_this_chain = true; - //LL_DEBUGS() << "no conditionals - processing" << LL_ENDL; - } - else - { - PUMP_DEBUG; - //LL_DEBUGS() << "checking conditionals" << LL_ENDL; - // Check if this run chain was signalled. If any file - // descriptor is ready for something, then go ahead and - // process this chian. - process_this_chain = false; - if(!signalled_client.empty()) - { - PUMP_DEBUG; - LLChainInfo::conditionals_t::iterator it; - it = (*run_chain).mDescriptors.begin(); - LLChainInfo::conditionals_t::iterator end; - end = (*run_chain).mDescriptors.end(); - S32 client_id = 0; - signal_client_t::iterator signal; - for(; it != end; ++it) - { - PUMP_DEBUG; - client_id = *((S32*)((*it).second.client_data)); - signal = signalled_client.find(client_id); - if (signal == not_signalled) continue; - static const apr_int16_t POLL_CHAIN_ERROR = - APR_POLLHUP | APR_POLLNVAL | APR_POLLERR; - const apr_pollfd_t* poll = &(poll_fd[(*signal).second]); - if(poll->rtnevents & POLL_CHAIN_ERROR) - { - // Potential eror condition has been - // returned. If HUP was one of them, we pass - // that as the error even though there may be - // more. If there are in fact more errors, - // we'll just wait for that detection until - // the next pump() cycle to catch it so that - // the logic here gets no more strained than - // it already is. - LLIOPipe::EStatus error_status; - if(poll->rtnevents & APR_POLLHUP) - error_status = LLIOPipe::STATUS_LOST_CONNECTION; - else - error_status = LLIOPipe::STATUS_ERROR; - if(handleChainError(*run_chain, error_status)) break; - ll_debug_poll_fd("Removing pipe", poll); - LL_WARNS() << "Removing pipe " - << (*run_chain).mChainLinks[0].mPipe - << " '" + run_chain = removeRunningChain(run_chain); + continue; + } + } + else if(isChainExpired(*run_chain)) + { + run_chain = removeRunningChain(run_chain); + continue; + } + + PUMP_DEBUG; + if((*run_chain).mLock) + { + ++run_chain; + continue; + } + PUMP_DEBUG; + mCurrentChain = run_chain; + + if((*run_chain).mDescriptors.empty()) + { + // if there are no conditionals, just process this chain. + process_this_chain = true; + //LL_DEBUGS() << "no conditionals - processing" << LL_ENDL; + } + else + { + PUMP_DEBUG; + //LL_DEBUGS() << "checking conditionals" << LL_ENDL; + // Check if this run chain was signalled. If any file + // descriptor is ready for something, then go ahead and + // process this chian. + process_this_chain = false; + if(!signalled_client.empty()) + { + PUMP_DEBUG; + LLChainInfo::conditionals_t::iterator it; + it = (*run_chain).mDescriptors.begin(); + LLChainInfo::conditionals_t::iterator end; + end = (*run_chain).mDescriptors.end(); + S32 client_id = 0; + signal_client_t::iterator signal; + for(; it != end; ++it) + { + PUMP_DEBUG; + client_id = *((S32*)((*it).second.client_data)); + signal = signalled_client.find(client_id); + if (signal == not_signalled) continue; + static const apr_int16_t POLL_CHAIN_ERROR = + APR_POLLHUP | APR_POLLNVAL | APR_POLLERR; + const apr_pollfd_t* poll = &(poll_fd[(*signal).second]); + if(poll->rtnevents & POLL_CHAIN_ERROR) + { + // Potential eror condition has been + // returned. If HUP was one of them, we pass + // that as the error even though there may be + // more. If there are in fact more errors, + // we'll just wait for that detection until + // the next pump() cycle to catch it so that + // the logic here gets no more strained than + // it already is. + LLIOPipe::EStatus error_status; + if(poll->rtnevents & APR_POLLHUP) + error_status = LLIOPipe::STATUS_LOST_CONNECTION; + else + error_status = LLIOPipe::STATUS_ERROR; + if(handleChainError(*run_chain, error_status)) break; + ll_debug_poll_fd("Removing pipe", poll); + LL_WARNS() << "Removing pipe " + << (*run_chain).mChainLinks[0].mPipe + << " '" #if LL_DEBUG_PIPE_TYPE_IN_PUMP - << typeid( - *((*run_chain).mChainLinks[0].mPipe)).name() + << typeid( + *((*run_chain).mChainLinks[0].mPipe)).name() #endif - << "' because: " - << events_2_string(poll->rtnevents) - << LL_ENDL; - (*run_chain).mHead = (*run_chain).mChainLinks.end(); - break; - } - - // at least 1 fd got signalled, and there were no - // errors. That means we process this chain. - process_this_chain = true; - break; - } - } - } - if(process_this_chain) - { - PUMP_DEBUG; - if(!((*run_chain).mInit)) - { - (*run_chain).mHead = (*run_chain).mChainLinks.begin(); - (*run_chain).mInit = true; - } - PUMP_DEBUG; - processChain(*run_chain); - } - - PUMP_DEBUG; - if((*run_chain).mHead == (*run_chain).mChainLinks.end()) - { + << "' because: " + << events_2_string(poll->rtnevents) + << LL_ENDL; + (*run_chain).mHead = (*run_chain).mChainLinks.end(); + break; + } + + // at least 1 fd got signalled, and there were no + // errors. That means we process this chain. + process_this_chain = true; + break; + } + } + } + if(process_this_chain) + { + PUMP_DEBUG; + if(!((*run_chain).mInit)) + { + (*run_chain).mHead = (*run_chain).mChainLinks.begin(); + (*run_chain).mInit = true; + } + PUMP_DEBUG; + processChain(*run_chain); + } + + PUMP_DEBUG; + if((*run_chain).mHead == (*run_chain).mChainLinks.end()) + { #if LL_DEBUG_PIPE_TYPE_IN_PUMP - LL_DEBUGS() << "Removing chain " << (*run_chain).mChainLinks[0].mPipe - << " '" - << typeid(*((*run_chain).mChainLinks[0].mPipe)).name() - << "' because we reached the end." << LL_ENDL; + LL_DEBUGS() << "Removing chain " << (*run_chain).mChainLinks[0].mPipe + << " '" + << typeid(*((*run_chain).mChainLinks[0].mPipe)).name() + << "' because we reached the end." << LL_ENDL; #else -// LL_DEBUGS() << "Removing chain " << (*run_chain).mChainLinks[0].mPipe -// << " because we reached the end." << LL_ENDL; +// LL_DEBUGS() << "Removing chain " << (*run_chain).mChainLinks[0].mPipe +// << " because we reached the end." << LL_ENDL; #endif - PUMP_DEBUG; - // This chain is done. Clean up any allocated memory and - // erase the chain info. - run_chain = removeRunningChain(run_chain); - - // *NOTE: may not always need to rebuild the pollset. - mRebuildPollset = true; - } - else - { - PUMP_DEBUG; - // this chain needs more processing - just go to the next - // chain. - ++run_chain; - } - } - - PUMP_DEBUG; - // null out the chain - mCurrentChain = mRunningChains.end(); - END_PUMP_DEBUG; + PUMP_DEBUG; + // This chain is done. Clean up any allocated memory and + // erase the chain info. + run_chain = removeRunningChain(run_chain); + + // *NOTE: may not always need to rebuild the pollset. + mRebuildPollset = true; + } + else + { + PUMP_DEBUG; + // this chain needs more processing - just go to the next + // chain. + ++run_chain; + } + } + + PUMP_DEBUG; + // null out the chain + mCurrentChain = mRunningChains.end(); + END_PUMP_DEBUG; } bool LLPumpIO::respond(LLIOPipe* pipe) { - if(NULL == pipe) return false; - - LLChainInfo info; - LLLinkInfo link; - link.mPipe = pipe; - info.mChainLinks.push_back(link); - mPendingCallbacks.push_back(info); - return true; + if(NULL == pipe) return false; + + LLChainInfo info; + LLLinkInfo link; + link.mPipe = pipe; + info.mChainLinks.push_back(link); + mPendingCallbacks.push_back(info); + return true; } bool LLPumpIO::respond( - const links_t& links, - LLIOPipe::buffer_ptr_t data, - LLSD context) + const links_t& links, + LLIOPipe::buffer_ptr_t data, + LLSD context) { - // if the caller is providing a full link description, we need to - // have that description matched to a particular buffer. - if(!data) return false; - if(links.empty()) return false; - - // Add the callback response - LLChainInfo info; - info.mChainLinks = links; - info.mData = data; - info.mContext = context; - mPendingCallbacks.push_back(info); - return true; + // if the caller is providing a full link description, we need to + // have that description matched to a particular buffer. + if(!data) return false; + if(links.empty()) return false; + + // Add the callback response + LLChainInfo info; + info.mChainLinks = links; + info.mData = data; + info.mContext = context; + mPendingCallbacks.push_back(info); + return true; } void LLPumpIO::callback() { LL_PROFILE_ZONE_SCOPED_CATEGORY_NETWORK; - //LL_INFOS() << "LLPumpIO::callback()" << LL_ENDL; - if(true) - { - std::copy( - mPendingCallbacks.begin(), - mPendingCallbacks.end(), - std::back_insert_iterator<callbacks_t>(mCallbacks)); - mPendingCallbacks.clear(); - } - if(!mCallbacks.empty()) - { - callbacks_t::iterator it = mCallbacks.begin(); - callbacks_t::iterator end = mCallbacks.end(); - for(; it != end; ++it) - { - // it's always the first and last time for respone chains - (*it).mHead = (*it).mChainLinks.begin(); - (*it).mInit = true; - (*it).mEOS = true; - processChain(*it); - } - mCallbacks.clear(); - } + //LL_INFOS() << "LLPumpIO::callback()" << LL_ENDL; + if(true) + { + std::copy( + mPendingCallbacks.begin(), + mPendingCallbacks.end(), + std::back_insert_iterator<callbacks_t>(mCallbacks)); + mPendingCallbacks.clear(); + } + if(!mCallbacks.empty()) + { + callbacks_t::iterator it = mCallbacks.begin(); + callbacks_t::iterator end = mCallbacks.end(); + for(; it != end; ++it) + { + // it's always the first and last time for respone chains + (*it).mHead = (*it).mChainLinks.begin(); + (*it).mInit = true; + (*it).mEOS = true; + processChain(*it); + } + mCallbacks.clear(); + } } void LLPumpIO::control(LLPumpIO::EControl op) { - switch(op) - { - case PAUSE: - mState = PAUSING; - break; - case RESUME: - mState = NORMAL; - break; - default: - // no-op - break; - } + switch(op) + { + case PAUSE: + mState = PAUSING; + break; + case RESUME: + mState = NORMAL; + break; + default: + // no-op + break; + } } void LLPumpIO::initialize(apr_pool_t* pool) { - if(!pool) return; - mPool = pool; + if(!pool) return; + mPool = pool; } void LLPumpIO::cleanup() { - if(mPollset) - { -// LL_DEBUGS() << "cleaning up pollset" << LL_ENDL; - apr_pollset_destroy(mPollset); - mPollset = NULL; - } - if(mCurrentPool) - { - apr_pool_destroy(mCurrentPool); - mCurrentPool = NULL; - } - mPool = NULL; + if(mPollset) + { +// LL_DEBUGS() << "cleaning up pollset" << LL_ENDL; + apr_pollset_destroy(mPollset); + mPollset = NULL; + } + if(mCurrentPool) + { + apr_pool_destroy(mCurrentPool); + mCurrentPool = NULL; + } + mPool = NULL; } void LLPumpIO::rebuildPollset() { -// LL_DEBUGS() << "LLPumpIO::rebuildPollset()" << LL_ENDL; - if(mPollset) - { - //LL_DEBUGS() << "destroying pollset" << LL_ENDL; - apr_pollset_destroy(mPollset); - mPollset = NULL; - } - U32 size = 0; - running_chains_t::iterator run_it = mRunningChains.begin(); - running_chains_t::iterator run_end = mRunningChains.end(); - for(; run_it != run_end; ++run_it) - { - size += (*run_it).mDescriptors.size(); - } - //LL_DEBUGS() << "found " << size << " descriptors." << LL_ENDL; - if(size) - { - // Recycle the memory pool - const S32 POLLSET_POOL_RECYCLE_COUNT = 100; - if(mCurrentPool - && (0 == (++mCurrentPoolReallocCount % POLLSET_POOL_RECYCLE_COUNT))) - { - apr_pool_destroy(mCurrentPool); - mCurrentPool = NULL; - mCurrentPoolReallocCount = 0; - } - if(!mCurrentPool) - { - apr_status_t status = apr_pool_create(&mCurrentPool, mPool); - (void)ll_apr_warn_status(status); - } - - // add all of the file descriptors - run_it = mRunningChains.begin(); - LLChainInfo::conditionals_t::iterator fd_it; - LLChainInfo::conditionals_t::iterator fd_end; - apr_pollset_create(&mPollset, size, mCurrentPool, 0); - for(; run_it != run_end; ++run_it) - { - fd_it = (*run_it).mDescriptors.begin(); - fd_end = (*run_it).mDescriptors.end(); - for(; fd_it != fd_end; ++fd_it) - { - apr_pollset_add(mPollset, &((*fd_it).second)); - } - } - } +// LL_DEBUGS() << "LLPumpIO::rebuildPollset()" << LL_ENDL; + if(mPollset) + { + //LL_DEBUGS() << "destroying pollset" << LL_ENDL; + apr_pollset_destroy(mPollset); + mPollset = NULL; + } + U32 size = 0; + running_chains_t::iterator run_it = mRunningChains.begin(); + running_chains_t::iterator run_end = mRunningChains.end(); + for(; run_it != run_end; ++run_it) + { + size += (*run_it).mDescriptors.size(); + } + //LL_DEBUGS() << "found " << size << " descriptors." << LL_ENDL; + if(size) + { + // Recycle the memory pool + const S32 POLLSET_POOL_RECYCLE_COUNT = 100; + if(mCurrentPool + && (0 == (++mCurrentPoolReallocCount % POLLSET_POOL_RECYCLE_COUNT))) + { + apr_pool_destroy(mCurrentPool); + mCurrentPool = NULL; + mCurrentPoolReallocCount = 0; + } + if(!mCurrentPool) + { + apr_status_t status = apr_pool_create(&mCurrentPool, mPool); + (void)ll_apr_warn_status(status); + } + + // add all of the file descriptors + run_it = mRunningChains.begin(); + LLChainInfo::conditionals_t::iterator fd_it; + LLChainInfo::conditionals_t::iterator fd_end; + apr_pollset_create(&mPollset, size, mCurrentPool, 0); + for(; run_it != run_end; ++run_it) + { + fd_it = (*run_it).mDescriptors.begin(); + fd_end = (*run_it).mDescriptors.end(); + for(; fd_it != fd_end; ++fd_it) + { + apr_pollset_add(mPollset, &((*fd_it).second)); + } + } + } } void LLPumpIO::processChain(LLChainInfo& chain) { - PUMP_DEBUG; - LLIOPipe::EStatus status = LLIOPipe::STATUS_OK; - links_t::iterator it = chain.mHead; - links_t::iterator end = chain.mChainLinks.end(); - bool need_process_signaled = false; - bool keep_going = true; - do - { + PUMP_DEBUG; + LLIOPipe::EStatus status = LLIOPipe::STATUS_OK; + links_t::iterator it = chain.mHead; + links_t::iterator end = chain.mChainLinks.end(); + bool need_process_signaled = false; + bool keep_going = true; + do + { #if LL_DEBUG_PROCESS_LINK #if LL_DEBUG_PIPE_TYPE_IN_PUMP - LL_INFOS() << "Processing " << typeid(*((*it).mPipe)).name() << "." - << LL_ENDL; + LL_INFOS() << "Processing " << typeid(*((*it).mPipe)).name() << "." + << LL_ENDL; #else - LL_INFOS() << "Processing link " << (*it).mPipe << "." << LL_ENDL; + LL_INFOS() << "Processing link " << (*it).mPipe << "." << LL_ENDL; #endif #endif #if LL_DEBUG_SPEW_BUFFER_CHANNEL_IN - if(chain.mData) - { - char* buf = NULL; - S32 bytes = chain.mData->countAfter((*it).mChannels.in(), NULL); - if(bytes) - { - buf = new char[bytes + 1]; - chain.mData->readAfter( - (*it).mChannels.in(), - NULL, - (U8*)buf, - bytes); - buf[bytes] = '\0'; - LL_INFOS() << "CHANNEL IN(" << (*it).mChannels.in() << "): " - << buf << LL_ENDL; - delete[] buf; - buf = NULL; - } - else - { - LL_INFOS() << "CHANNEL IN(" << (*it).mChannels.in()<< "): (null)" - << LL_ENDL; - } - } + if(chain.mData) + { + char* buf = NULL; + S32 bytes = chain.mData->countAfter((*it).mChannels.in(), NULL); + if(bytes) + { + buf = new char[bytes + 1]; + chain.mData->readAfter( + (*it).mChannels.in(), + NULL, + (U8*)buf, + bytes); + buf[bytes] = '\0'; + LL_INFOS() << "CHANNEL IN(" << (*it).mChannels.in() << "): " + << buf << LL_ENDL; + delete[] buf; + buf = NULL; + } + else + { + LL_INFOS() << "CHANNEL IN(" << (*it).mChannels.in()<< "): (null)" + << LL_ENDL; + } + } #endif - PUMP_DEBUG; - status = (*it).mPipe->process( - (*it).mChannels, - chain.mData, - chain.mEOS, - chain.mContext, - this); + PUMP_DEBUG; + status = (*it).mPipe->process( + (*it).mChannels, + chain.mData, + chain.mEOS, + chain.mContext, + this); #if LL_DEBUG_SPEW_BUFFER_CHANNEL_OUT - if(chain.mData) - { - char* buf = NULL; - S32 bytes = chain.mData->countAfter((*it).mChannels.out(), NULL); - if(bytes) - { - buf = new char[bytes + 1]; - chain.mData->readAfter( - (*it).mChannels.out(), - NULL, - (U8*)buf, - bytes); - buf[bytes] = '\0'; - LL_INFOS() << "CHANNEL OUT(" << (*it).mChannels.out()<< "): " - << buf << LL_ENDL; - delete[] buf; - buf = NULL; - } - else - { - LL_INFOS() << "CHANNEL OUT(" << (*it).mChannels.out()<< "): (null)" - << LL_ENDL; - } - } + if(chain.mData) + { + char* buf = NULL; + S32 bytes = chain.mData->countAfter((*it).mChannels.out(), NULL); + if(bytes) + { + buf = new char[bytes + 1]; + chain.mData->readAfter( + (*it).mChannels.out(), + NULL, + (U8*)buf, + bytes); + buf[bytes] = '\0'; + LL_INFOS() << "CHANNEL OUT(" << (*it).mChannels.out()<< "): " + << buf << LL_ENDL; + delete[] buf; + buf = NULL; + } + else + { + LL_INFOS() << "CHANNEL OUT(" << (*it).mChannels.out()<< "): (null)" + << LL_ENDL; + } + } #endif #if LL_DEBUG_PROCESS_RETURN_VALUE - // Only bother with the success codes - error codes are logged - // below. - if(LLIOPipe::isSuccess(status)) - { - LL_INFOS() << "Pipe returned: '" + // Only bother with the success codes - error codes are logged + // below. + if(LLIOPipe::isSuccess(status)) + { + LL_INFOS() << "Pipe returned: '" #if LL_DEBUG_PIPE_TYPE_IN_PUMP - << typeid(*((*it).mPipe)).name() << "':'" + << typeid(*((*it).mPipe)).name() << "':'" #endif - << LLIOPipe::lookupStatusString(status) << "'" << LL_ENDL; - } + << LLIOPipe::lookupStatusString(status) << "'" << LL_ENDL; + } #endif - PUMP_DEBUG; - switch(status) - { - case LLIOPipe::STATUS_OK: - // no-op - break; - case LLIOPipe::STATUS_STOP: - PUMP_DEBUG; - status = LLIOPipe::STATUS_OK; - chain.mHead = end; - keep_going = false; - break; - case LLIOPipe::STATUS_DONE: - PUMP_DEBUG; - status = LLIOPipe::STATUS_OK; - chain.mHead = (it + 1); - chain.mEOS = true; - break; - case LLIOPipe::STATUS_BREAK: - PUMP_DEBUG; - status = LLIOPipe::STATUS_OK; - keep_going = false; - break; - case LLIOPipe::STATUS_NEED_PROCESS: - PUMP_DEBUG; - status = LLIOPipe::STATUS_OK; - if(!need_process_signaled) - { - need_process_signaled = true; - chain.mHead = it; - } - break; - default: - PUMP_DEBUG; - if(LLIOPipe::isError(status)) - { - LL_INFOS() << "Pump generated pipe err: '" + PUMP_DEBUG; + switch(status) + { + case LLIOPipe::STATUS_OK: + // no-op + break; + case LLIOPipe::STATUS_STOP: + PUMP_DEBUG; + status = LLIOPipe::STATUS_OK; + chain.mHead = end; + keep_going = false; + break; + case LLIOPipe::STATUS_DONE: + PUMP_DEBUG; + status = LLIOPipe::STATUS_OK; + chain.mHead = (it + 1); + chain.mEOS = true; + break; + case LLIOPipe::STATUS_BREAK: + PUMP_DEBUG; + status = LLIOPipe::STATUS_OK; + keep_going = false; + break; + case LLIOPipe::STATUS_NEED_PROCESS: + PUMP_DEBUG; + status = LLIOPipe::STATUS_OK; + if(!need_process_signaled) + { + need_process_signaled = true; + chain.mHead = it; + } + break; + default: + PUMP_DEBUG; + if(LLIOPipe::isError(status)) + { + LL_INFOS() << "Pump generated pipe err: '" #if LL_DEBUG_PIPE_TYPE_IN_PUMP - << typeid(*((*it).mPipe)).name() << "':'" + << typeid(*((*it).mPipe)).name() << "':'" #endif - << LLIOPipe::lookupStatusString(status) - << "'" << LL_ENDL; + << LLIOPipe::lookupStatusString(status) + << "'" << LL_ENDL; #if LL_DEBUG_SPEW_BUFFER_CHANNEL_IN_ON_ERROR - if(chain.mData) - { - char* buf = NULL; - S32 bytes = chain.mData->countAfter( - (*it).mChannels.in(), - NULL); - if(bytes) - { - buf = new char[bytes + 1]; - chain.mData->readAfter( - (*it).mChannels.in(), - NULL, - (U8*)buf, - bytes); - buf[bytes] = '\0'; - LL_INFOS() << "Input After Error: " << buf << LL_ENDL; - delete[] buf; - buf = NULL; - } - else - { - LL_INFOS() << "Input After Error: (null)" << LL_ENDL; - } - } - else - { - LL_INFOS() << "Input After Error: (null)" << LL_ENDL; - } + if(chain.mData) + { + char* buf = NULL; + S32 bytes = chain.mData->countAfter( + (*it).mChannels.in(), + NULL); + if(bytes) + { + buf = new char[bytes + 1]; + chain.mData->readAfter( + (*it).mChannels.in(), + NULL, + (U8*)buf, + bytes); + buf[bytes] = '\0'; + LL_INFOS() << "Input After Error: " << buf << LL_ENDL; + delete[] buf; + buf = NULL; + } + else + { + LL_INFOS() << "Input After Error: (null)" << LL_ENDL; + } + } + else + { + LL_INFOS() << "Input After Error: (null)" << LL_ENDL; + } #endif - keep_going = false; - chain.mHead = it; - if(!handleChainError(chain, status)) - { - chain.mHead = end; - } - } - else - { - LL_INFOS() << "Unhandled status code: " << status << ":" - << LLIOPipe::lookupStatusString(status) << LL_ENDL; - } - break; - } - PUMP_DEBUG; - } while(keep_going && (++it != end)); - PUMP_DEBUG; + keep_going = false; + chain.mHead = it; + if(!handleChainError(chain, status)) + { + chain.mHead = end; + } + } + else + { + LL_INFOS() << "Unhandled status code: " << status << ":" + << LLIOPipe::lookupStatusString(status) << LL_ENDL; + } + break; + } + PUMP_DEBUG; + } while(keep_going && (++it != end)); + PUMP_DEBUG; } bool LLPumpIO::isChainExpired(LLChainInfo& chain) { - if(!chain.mHasCurlRequest) - { - return false ; - } - - for(links_t::iterator iter = chain.mChainLinks.begin(); iter != chain.mChainLinks.end(); ++iter) - { - if(!(*iter).mPipe->isValid()) - { - return true ; - } - } - - return false ; + if(!chain.mHasCurlRequest) + { + return false ; + } + + for(links_t::iterator iter = chain.mChainLinks.begin(); iter != chain.mChainLinks.end(); ++iter) + { + if(!(*iter).mPipe->isValid()) + { + return true ; + } + } + + return false ; } bool LLPumpIO::handleChainError( - LLChainInfo& chain, - LLIOPipe::EStatus error) + LLChainInfo& chain, + LLIOPipe::EStatus error) { - links_t::reverse_iterator rit; - if(chain.mHead == chain.mChainLinks.end()) - { - rit = links_t::reverse_iterator(chain.mHead); - } - else - { - rit = links_t::reverse_iterator(chain.mHead + 1); - } - - links_t::reverse_iterator rend = chain.mChainLinks.rend(); - bool handled = false; - bool keep_going = true; - do - { + links_t::reverse_iterator rit; + if(chain.mHead == chain.mChainLinks.end()) + { + rit = links_t::reverse_iterator(chain.mHead); + } + else + { + rit = links_t::reverse_iterator(chain.mHead + 1); + } + + links_t::reverse_iterator rend = chain.mChainLinks.rend(); + bool handled = false; + bool keep_going = true; + do + { #if LL_DEBUG_PIPE_TYPE_IN_PUMP - LL_DEBUGS() << "Passing error to " << typeid(*((*rit).mPipe)).name() - << "." << LL_ENDL; + LL_DEBUGS() << "Passing error to " << typeid(*((*rit).mPipe)).name() + << "." << LL_ENDL; #endif - error = (*rit).mPipe->handleError(error, this); - switch(error) - { - case LLIOPipe::STATUS_OK: - handled = true; - chain.mHead = rit.base(); - break; - case LLIOPipe::STATUS_STOP: - case LLIOPipe::STATUS_DONE: - case LLIOPipe::STATUS_BREAK: - case LLIOPipe::STATUS_NEED_PROCESS: + error = (*rit).mPipe->handleError(error, this); + switch(error) + { + case LLIOPipe::STATUS_OK: + handled = true; + chain.mHead = rit.base(); + break; + case LLIOPipe::STATUS_STOP: + case LLIOPipe::STATUS_DONE: + case LLIOPipe::STATUS_BREAK: + case LLIOPipe::STATUS_NEED_PROCESS: #if LL_DEBUG_PIPE_TYPE_IN_PUMP - LL_DEBUGS() << "Pipe " << typeid(*((*rit).mPipe)).name() - << " returned code to stop error handler." << LL_ENDL; + LL_DEBUGS() << "Pipe " << typeid(*((*rit).mPipe)).name() + << " returned code to stop error handler." << LL_ENDL; #endif - keep_going = false; - break; - case LLIOPipe::STATUS_EXPIRED: - keep_going = false; - break ; - default: - if(LLIOPipe::isSuccess(error)) - { - LL_INFOS() << "Unhandled status code: " << error << ":" - << LLIOPipe::lookupStatusString(error) << LL_ENDL; - error = LLIOPipe::STATUS_ERROR; - keep_going = false; - } - break; - } - } while(keep_going && !handled && (++rit != rend)); - return handled; + keep_going = false; + break; + case LLIOPipe::STATUS_EXPIRED: + keep_going = false; + break ; + default: + if(LLIOPipe::isSuccess(error)) + { + LL_INFOS() << "Unhandled status code: " << error << ":" + << LLIOPipe::lookupStatusString(error) << LL_ENDL; + error = LLIOPipe::STATUS_ERROR; + keep_going = false; + } + break; + } + } while(keep_going && !handled && (++rit != rend)); + return handled; } /** @@ -1113,34 +1113,34 @@ bool LLPumpIO::handleChainError( */ LLPumpIO::LLChainInfo::LLChainInfo() : - mInit(false), - mLock(0), - mEOS(false), - mHasCurlRequest(false) + mInit(false), + mLock(0), + mEOS(false), + mHasCurlRequest(false) { - mTimer.setTimerExpirySec(DEFAULT_CHAIN_EXPIRY_SECS); + mTimer.setTimerExpirySec(DEFAULT_CHAIN_EXPIRY_SECS); } void LLPumpIO::LLChainInfo::setTimeoutSeconds(F32 timeout) { - if(timeout > 0.0f) - { - mTimer.start(); - mTimer.reset(); - mTimer.setTimerExpirySec(timeout); - } - else - { - mTimer.stop(); - } + if(timeout > 0.0f) + { + mTimer.start(); + mTimer.reset(); + mTimer.setTimerExpirySec(timeout); + } + else + { + mTimer.stop(); + } } void LLPumpIO::LLChainInfo::adjustTimeoutSeconds(F32 delta) { - if(mTimer.getStarted()) - { - F64 expiry = mTimer.expiresAt(); - expiry += delta; - mTimer.setExpiryAt(expiry); - } + if(mTimer.getStarted()) + { + F64 expiry = mTimer.expiresAt(); + expiry += delta; + mTimer.setExpiryAt(expiry); + } } |