diff options
author | Aaron Brashears <aaronb@lindenlab.com> | 2007-12-05 01:15:45 +0000 |
---|---|---|
committer | Aaron Brashears <aaronb@lindenlab.com> | 2007-12-05 01:15:45 +0000 |
commit | 2a9be0445b82fdca0fb98da20f74c0200a9bffe1 (patch) | |
tree | 7dcc2d3727b33d950aa9ea4026a9c3873eabb5ad /indra/llmessage/llpumpio.cpp | |
parent | f8511d77a70bea452cde7270b47044358e58427c (diff) |
Result of svn merge -r74235:74242 svn+ssh://svn/svn/linden/branches/robust-pump into release
Diffstat (limited to 'indra/llmessage/llpumpio.cpp')
-rw-r--r-- | indra/llmessage/llpumpio.cpp | 216 |
1 files changed, 170 insertions, 46 deletions
diff --git a/indra/llmessage/llpumpio.cpp b/indra/llmessage/llpumpio.cpp index ea7c06f38d..c3b298d020 100644 --- a/indra/llmessage/llpumpio.cpp +++ b/indra/llmessage/llpumpio.cpp @@ -34,6 +34,7 @@ #include "linden_common.h" #include "llpumpio.h" +#include <map> #include <set> #include "apr-1/apr_poll.h" @@ -41,10 +42,15 @@ #include "llmemtype.h" #include "llstl.h" -// This should not be in production, but it is intensely useful during -// development. +// These should not be enabled in production, but they can be +// intensely useful during development for finding certain kinds of +// bugs. #if LL_LINUX -#define LL_DEBUG_PIPE_TYPE_IN_PUMP 0 +//#define LL_DEBUG_PIPE_TYPE_IN_PUMP 1 +//#define LL_DEBUG_POLL_FILE_DESCRIPTORS 1 +#if LL_DEBUG_POLL_FILE_DESCRIPTORS +#include "apr-1/apr_portable.h" +#endif #endif #if LL_DEBUG_PIPE_TYPE_IN_PUMP @@ -73,6 +79,52 @@ extern const F32 NEVER_CHAIN_EXPIRY_SECS = 0.0f; //#define LL_DEBUG_SPEW_BUFFER_CHANNEL_IN 1 //#define LL_DEBUG_SPEW_BUFFER_CHANNEL_OUT 1 +// +// local functions +// +void ll_debug_poll_fd(const char* msg, const apr_pollfd_t* poll) +{ +#if LL_DEBUG_POLL_FILE_DESCRIPTORS + if(!poll) + { + lldebugs << "Poll -- " << (msg?msg:"") << ": no pollfd." << llendl; + return; + } + if(poll->desc.s) + { + apr_os_sock_t os_sock; + if(APR_SUCCESS == apr_os_sock_get(&os_sock, poll->desc.s)) + { + lldebugs << "Poll -- " << (msg?msg:"") << " on fd " << os_sock + << " at " << poll->desc.s << llendl; + } + else + { + lldebugs << "Poll -- " << (msg?msg:"") << " no fd " + << " at " << poll->desc.s << llendl; + } + } + else if(poll->desc.f) + { + apr_os_file_t os_file; + if(APR_SUCCESS == apr_os_file_get(&os_file, poll->desc.f)) + { + lldebugs << "Poll -- " << (msg?msg:"") << " on fd " << os_file + << " at " << poll->desc.f << llendl; + } + else + { + lldebugs << "Poll -- " << (msg?msg:"") << " no fd " + << " at " << poll->desc.f << llendl; + } + } + else + { + lldebugs << "Poll -- " << (msg?msg:"") << ": no descriptor." << llendl; + } +#endif +} + /** * @class */ @@ -217,50 +269,88 @@ bool LLPumpIO::setTimeoutSeconds(F32 timeout) return true; } +static std::string events_2_string(apr_int16_t events) +{ + std::ostringstream ostr; + if(events & APR_POLLIN) + { + ostr << "read,"; + } + if(events & APR_POLLPRI) + { + ostr << "priority,"; + } + if(events & APR_POLLOUT) + { + ostr << "write,"; + } + if(events & APR_POLLERR) + { + ostr << "error,"; + } + if(events & APR_POLLHUP) + { + ostr << "hangup,"; + } + if(events & APR_POLLNVAL) + { + ostr << "invalid,"; + } + return chop_tail_copy(ostr.str(), 1); +} + bool LLPumpIO::setConditional(LLIOPipe* pipe, const apr_pollfd_t* poll) { LLMemType m1(LLMemType::MTYPE_IO_PUMP); - //lldebugs << "LLPumpIO::setConditional" << llendl; - if(pipe) + if(!pipe) return false; + ll_debug_poll_fd("Set conditional", poll); + + lldebugs << "Setting conditionals (" << events_2_string(poll->reqevents) + << ") " +#if LL_DEBUG_PIPE_TYPE_IN_PUMP + << "on pipe " << typeid(*pipe).name() +#endif + << " at " << pipe << llendl; + + // remove any matching poll file descriptors for this pipe. + LLIOPipe::ptr_t pipe_ptr(pipe); + LLChainInfo::conditionals_t::iterator it; + it = (*mCurrentChain).mDescriptors.begin(); + while(it != (*mCurrentChain).mDescriptors.end()) { - // remove any matching poll file descriptors for this pipe. - LLIOPipe::ptr_t pipe_ptr(pipe); - LLChainInfo::conditionals_t::iterator it; - it = (*mCurrentChain).mDescriptors.begin(); - while(it != (*mCurrentChain).mDescriptors.end()) + LLChainInfo::pipe_conditional_t& value = (*it); + if(pipe_ptr == value.first) { - LLChainInfo::pipe_conditional_t& value = (*it); - if(pipe_ptr == value.first) - { - ll_delete_apr_pollset_fd_client_data()(value); - it = (*mCurrentChain).mDescriptors.erase(it); - mRebuildPollset = true; - } - else - { - ++it; - } + ll_delete_apr_pollset_fd_client_data()(value); + it = (*mCurrentChain).mDescriptors.erase(it); + mRebuildPollset = true; } - - if(poll) + else { - LLChainInfo::pipe_conditional_t value; - value.first = pipe_ptr; - value.second = *poll; - if(!poll->p) - { - // each fd needs a pool to work with, so if one was - // not specified, use this pool. - // *FIX: Should it always be this pool? - value.second.p = mPool; - } - value.second.client_data = new S32(++mPollsetClientID); - (*mCurrentChain).mDescriptors.push_back(value); - mRebuildPollset = true; + ++it; } + } + + if(!poll) + { + mRebuildPollset = true; return true; } - return false; + LLChainInfo::pipe_conditional_t value; + value.first = pipe_ptr; + value.second = *poll; + value.second.rtnevents = 0; + if(!poll->p) + { + // each fd needs a pool to work with, so if one was + // not specified, use this pool. + // *FIX: Should it always be this pool? + value.second.p = mPool; + } + value.second.client_data = new S32(++mPollsetClientID); + (*mCurrentChain).mDescriptors.push_back(value); + mRebuildPollset = true; + return true; } S32 LLPumpIO::setLock() @@ -412,24 +502,25 @@ void LLPumpIO::pump(const S32& poll_timeout) } // Poll based on the last known pollset - // *FIX: may want to pass in a poll timeout so it works correctly + // *TODO: may want to pass in a poll timeout so it works correctly // in single and multi threaded processes. PUMP_DEBUG; - typedef std::set<S32> signal_client_t; + typedef std::map<S32, S32> signal_client_t; signal_client_t signalled_client; + const apr_pollfd_t* poll_fd = NULL; if(mPollset) { PUMP_DEBUG; //llinfos << "polling" << llendl; S32 count = 0; S32 client_id = 0; - const apr_pollfd_t* poll_fd = NULL; apr_pollset_poll(mPollset, poll_timeout, &count, &poll_fd); PUMP_DEBUG; - for(S32 i = 0; i < count; ++i) + for(S32 ii = 0; ii < count; ++ii) { - client_id = *((S32*)poll_fd[i].client_data); - signalled_client.insert(client_id); + ll_debug_poll_fd("Signalled pipe", &poll_fd[ii]); + client_id = *((S32*)poll_fd[ii].client_data); + signalled_client[client_id] = ii; } PUMP_DEBUG; } @@ -515,16 +606,49 @@ void LLPumpIO::pump(const S32& poll_timeout) LLChainInfo::conditionals_t::iterator end; end = (*run_chain).mDescriptors.end(); S32 client_id = 0; + signal_client_t::iterator signal; for(; it != end; ++it) { PUMP_DEBUG; client_id = *((S32*)((*it).second.client_data)); - if(signalled_client.find(client_id) != not_signalled) + signal = signalled_client.find(client_id); + if (signal == not_signalled) continue; + static const apr_int16_t POLL_CHAIN_ERROR = + APR_POLLHUP | APR_POLLNVAL | APR_POLLERR; + const apr_pollfd_t* poll = &(poll_fd[(*signal).second]); + if(poll->rtnevents & POLL_CHAIN_ERROR) { - process_this_chain = true; + // Potential eror condition has been + // returned. If HUP was one of them, we pass + // that as the error even though there may be + // more. If there are in fact more errors, + // we'll just wait for that detection until + // the next pump() cycle to catch it so that + // the logic here gets no more strained than + // it already is. + LLIOPipe::EStatus error_status; + if(poll->rtnevents & APR_POLLHUP) + error_status = LLIOPipe::STATUS_LOST_CONNECTION; + else + error_status = LLIOPipe::STATUS_ERROR; + if(handleChainError(*run_chain, error_status)) break; + ll_debug_poll_fd("Removing pipe", poll); + llwarns << "Removing pipe " + << (*run_chain).mChainLinks[0].mPipe + << " '" + << typeid( + *((*run_chain).mChainLinks[0].mPipe)).name() + << "' because: " + << events_2_string(poll->rtnevents) + << llendl; + (*run_chain).mHead = (*run_chain).mChainLinks.end(); break; } - //llinfos << "no fd ready for this one." << llendl; + + // at least 1 fd got signalled, and there were no + // errors. That means we process this chain. + process_this_chain = true; + break; } } } |