diff options
author | Aaron Brashears <aaronb@lindenlab.com> | 2007-12-05 01:15:45 +0000 |
---|---|---|
committer | Aaron Brashears <aaronb@lindenlab.com> | 2007-12-05 01:15:45 +0000 |
commit | 2a9be0445b82fdca0fb98da20f74c0200a9bffe1 (patch) | |
tree | 7dcc2d3727b33d950aa9ea4026a9c3873eabb5ad /indra/llmessage | |
parent | f8511d77a70bea452cde7270b47044358e58427c (diff) |
Result of svn merge -r74235:74242 svn+ssh://svn/svn/linden/branches/robust-pump into release
Diffstat (limited to 'indra/llmessage')
-rw-r--r-- | indra/llmessage/lliopipe.cpp | 1 | ||||
-rw-r--r-- | indra/llmessage/lliopipe.h | 9 | ||||
-rw-r--r-- | indra/llmessage/lliosocket.cpp | 38 | ||||
-rw-r--r-- | indra/llmessage/llpumpio.cpp | 216 | ||||
-rw-r--r-- | indra/llmessage/llpumpio.h | 14 |
5 files changed, 228 insertions, 50 deletions
diff --git a/indra/llmessage/lliopipe.cpp b/indra/llmessage/lliopipe.cpp index 8c0d01f7fc..a0cf5183f1 100644 --- a/indra/llmessage/lliopipe.cpp +++ b/indra/llmessage/lliopipe.cpp @@ -51,6 +51,7 @@ static const std::string STATUS_ERROR_NAMES[LLIOPipe::STATUS_ERROR_COUNT] = std::string("STATUS_NOT_IMPLEMENTED"), std::string("STATUS_PRECONDITION_NOT_MET"), std::string("STATUS_NO_CONNECTION"), + std::string("STATUS_LOST_CONNECTION"), std::string("STATUS_EXPIRED"), }; diff --git a/indra/llmessage/lliopipe.h b/indra/llmessage/lliopipe.h index 47924eecc6..e480f83b55 100644 --- a/indra/llmessage/lliopipe.h +++ b/indra/llmessage/lliopipe.h @@ -148,11 +148,14 @@ public: // This means we could not connect to a remote host. STATUS_NO_CONNECTION = -4, - // This means we could not connect to a remote host. - STATUS_EXPIRED = -5, + // The connection was lost. + STATUS_LOST_CONNECTION = -5, + + // The totoal process time has exceeded the timeout. + STATUS_EXPIRED = -6, // Keep track of the count of codes here. - STATUS_ERROR_COUNT = 5, + STATUS_ERROR_COUNT = 6, }; /** diff --git a/indra/llmessage/lliosocket.cpp b/indra/llmessage/lliosocket.cpp index 920a5e4aa2..b1f55a297c 100644 --- a/indra/llmessage/lliosocket.cpp +++ b/indra/llmessage/lliosocket.cpp @@ -64,6 +64,40 @@ bool is_addr_in_use(apr_status_t status) #endif } +#if LL_LINUX +// Define this to see the actual file descriptors being tossed around. +//#define LL_DEBUG_SOCKET_FILE_DESCRIPTORS 1 +#if LL_DEBUG_SOCKET_FILE_DESCRIPTORS +#include "apr-1/apr_portable.h" +#endif +#endif + + +// Quick function +void ll_debug_socket(const char* msg, apr_socket_t* apr_sock) +{ +#if LL_DEBUG_SOCKET_FILE_DESCRIPTORS + if(!apr_sock) + { + lldebugs << "Socket -- " << (msg?msg:"") << ": no socket." << llendl; + return; + } + // *TODO: Why doesn't this work? + //apr_os_sock_t os_sock; + int os_sock; + if(APR_SUCCESS == apr_os_sock_get(&os_sock, apr_sock)) + { + lldebugs << "Socket -- " << (msg?msg:"") << " on fd " << os_sock + << " at " << apr_sock << llendl; + } + else + { + lldebugs << "Socket -- " << (msg?msg:"") << " no fd " + << " at " << apr_sock << llendl; + } +#endif +} + /// /// LLSocket /// @@ -199,6 +233,7 @@ bool LLSocket::blockingConnect(const LLHost& host) return false; } apr_socket_timeout_set(mSocket, 1000); + ll_debug_socket("Blocking connect", mSocket); if(ll_apr_warn_status(apr_socket_connect(mSocket, sa))) return false; setOptions(); return true; @@ -209,6 +244,7 @@ LLSocket::LLSocket(apr_socket_t* socket, apr_pool_t* pool) : mPool(pool), mPort(PORT_INVALID) { + ll_debug_socket("Constructing wholely formed socket", mSocket); LLMemType m1(LLMemType::MTYPE_IO_TCP); } @@ -216,9 +252,9 @@ LLSocket::~LLSocket() { LLMemType m1(LLMemType::MTYPE_IO_TCP); // *FIX: clean up memory we are holding. - //lldebugs << "Destroying LLSocket" << llendl; if(mSocket) { + ll_debug_socket("Destroying socket", mSocket); apr_socket_close(mSocket); } if(mPool) diff --git a/indra/llmessage/llpumpio.cpp b/indra/llmessage/llpumpio.cpp index ea7c06f38d..c3b298d020 100644 --- a/indra/llmessage/llpumpio.cpp +++ b/indra/llmessage/llpumpio.cpp @@ -34,6 +34,7 @@ #include "linden_common.h" #include "llpumpio.h" +#include <map> #include <set> #include "apr-1/apr_poll.h" @@ -41,10 +42,15 @@ #include "llmemtype.h" #include "llstl.h" -// This should not be in production, but it is intensely useful during -// development. +// These should not be enabled in production, but they can be +// intensely useful during development for finding certain kinds of +// bugs. #if LL_LINUX -#define LL_DEBUG_PIPE_TYPE_IN_PUMP 0 +//#define LL_DEBUG_PIPE_TYPE_IN_PUMP 1 +//#define LL_DEBUG_POLL_FILE_DESCRIPTORS 1 +#if LL_DEBUG_POLL_FILE_DESCRIPTORS +#include "apr-1/apr_portable.h" +#endif #endif #if LL_DEBUG_PIPE_TYPE_IN_PUMP @@ -73,6 +79,52 @@ extern const F32 NEVER_CHAIN_EXPIRY_SECS = 0.0f; //#define LL_DEBUG_SPEW_BUFFER_CHANNEL_IN 1 //#define LL_DEBUG_SPEW_BUFFER_CHANNEL_OUT 1 +// +// local functions +// +void ll_debug_poll_fd(const char* msg, const apr_pollfd_t* poll) +{ +#if LL_DEBUG_POLL_FILE_DESCRIPTORS + if(!poll) + { + lldebugs << "Poll -- " << (msg?msg:"") << ": no pollfd." << llendl; + return; + } + if(poll->desc.s) + { + apr_os_sock_t os_sock; + if(APR_SUCCESS == apr_os_sock_get(&os_sock, poll->desc.s)) + { + lldebugs << "Poll -- " << (msg?msg:"") << " on fd " << os_sock + << " at " << poll->desc.s << llendl; + } + else + { + lldebugs << "Poll -- " << (msg?msg:"") << " no fd " + << " at " << poll->desc.s << llendl; + } + } + else if(poll->desc.f) + { + apr_os_file_t os_file; + if(APR_SUCCESS == apr_os_file_get(&os_file, poll->desc.f)) + { + lldebugs << "Poll -- " << (msg?msg:"") << " on fd " << os_file + << " at " << poll->desc.f << llendl; + } + else + { + lldebugs << "Poll -- " << (msg?msg:"") << " no fd " + << " at " << poll->desc.f << llendl; + } + } + else + { + lldebugs << "Poll -- " << (msg?msg:"") << ": no descriptor." << llendl; + } +#endif +} + /** * @class */ @@ -217,50 +269,88 @@ bool LLPumpIO::setTimeoutSeconds(F32 timeout) return true; } +static std::string events_2_string(apr_int16_t events) +{ + std::ostringstream ostr; + if(events & APR_POLLIN) + { + ostr << "read,"; + } + if(events & APR_POLLPRI) + { + ostr << "priority,"; + } + if(events & APR_POLLOUT) + { + ostr << "write,"; + } + if(events & APR_POLLERR) + { + ostr << "error,"; + } + if(events & APR_POLLHUP) + { + ostr << "hangup,"; + } + if(events & APR_POLLNVAL) + { + ostr << "invalid,"; + } + return chop_tail_copy(ostr.str(), 1); +} + bool LLPumpIO::setConditional(LLIOPipe* pipe, const apr_pollfd_t* poll) { LLMemType m1(LLMemType::MTYPE_IO_PUMP); - //lldebugs << "LLPumpIO::setConditional" << llendl; - if(pipe) + if(!pipe) return false; + ll_debug_poll_fd("Set conditional", poll); + + lldebugs << "Setting conditionals (" << events_2_string(poll->reqevents) + << ") " +#if LL_DEBUG_PIPE_TYPE_IN_PUMP + << "on pipe " << typeid(*pipe).name() +#endif + << " at " << pipe << llendl; + + // remove any matching poll file descriptors for this pipe. + LLIOPipe::ptr_t pipe_ptr(pipe); + LLChainInfo::conditionals_t::iterator it; + it = (*mCurrentChain).mDescriptors.begin(); + while(it != (*mCurrentChain).mDescriptors.end()) { - // remove any matching poll file descriptors for this pipe. - LLIOPipe::ptr_t pipe_ptr(pipe); - LLChainInfo::conditionals_t::iterator it; - it = (*mCurrentChain).mDescriptors.begin(); - while(it != (*mCurrentChain).mDescriptors.end()) + LLChainInfo::pipe_conditional_t& value = (*it); + if(pipe_ptr == value.first) { - LLChainInfo::pipe_conditional_t& value = (*it); - if(pipe_ptr == value.first) - { - ll_delete_apr_pollset_fd_client_data()(value); - it = (*mCurrentChain).mDescriptors.erase(it); - mRebuildPollset = true; - } - else - { - ++it; - } + ll_delete_apr_pollset_fd_client_data()(value); + it = (*mCurrentChain).mDescriptors.erase(it); + mRebuildPollset = true; } - - if(poll) + else { - LLChainInfo::pipe_conditional_t value; - value.first = pipe_ptr; - value.second = *poll; - if(!poll->p) - { - // each fd needs a pool to work with, so if one was - // not specified, use this pool. - // *FIX: Should it always be this pool? - value.second.p = mPool; - } - value.second.client_data = new S32(++mPollsetClientID); - (*mCurrentChain).mDescriptors.push_back(value); - mRebuildPollset = true; + ++it; } + } + + if(!poll) + { + mRebuildPollset = true; return true; } - return false; + LLChainInfo::pipe_conditional_t value; + value.first = pipe_ptr; + value.second = *poll; + value.second.rtnevents = 0; + if(!poll->p) + { + // each fd needs a pool to work with, so if one was + // not specified, use this pool. + // *FIX: Should it always be this pool? + value.second.p = mPool; + } + value.second.client_data = new S32(++mPollsetClientID); + (*mCurrentChain).mDescriptors.push_back(value); + mRebuildPollset = true; + return true; } S32 LLPumpIO::setLock() @@ -412,24 +502,25 @@ void LLPumpIO::pump(const S32& poll_timeout) } // Poll based on the last known pollset - // *FIX: may want to pass in a poll timeout so it works correctly + // *TODO: may want to pass in a poll timeout so it works correctly // in single and multi threaded processes. PUMP_DEBUG; - typedef std::set<S32> signal_client_t; + typedef std::map<S32, S32> signal_client_t; signal_client_t signalled_client; + const apr_pollfd_t* poll_fd = NULL; if(mPollset) { PUMP_DEBUG; //llinfos << "polling" << llendl; S32 count = 0; S32 client_id = 0; - const apr_pollfd_t* poll_fd = NULL; apr_pollset_poll(mPollset, poll_timeout, &count, &poll_fd); PUMP_DEBUG; - for(S32 i = 0; i < count; ++i) + for(S32 ii = 0; ii < count; ++ii) { - client_id = *((S32*)poll_fd[i].client_data); - signalled_client.insert(client_id); + ll_debug_poll_fd("Signalled pipe", &poll_fd[ii]); + client_id = *((S32*)poll_fd[ii].client_data); + signalled_client[client_id] = ii; } PUMP_DEBUG; } @@ -515,16 +606,49 @@ void LLPumpIO::pump(const S32& poll_timeout) LLChainInfo::conditionals_t::iterator end; end = (*run_chain).mDescriptors.end(); S32 client_id = 0; + signal_client_t::iterator signal; for(; it != end; ++it) { PUMP_DEBUG; client_id = *((S32*)((*it).second.client_data)); - if(signalled_client.find(client_id) != not_signalled) + signal = signalled_client.find(client_id); + if (signal == not_signalled) continue; + static const apr_int16_t POLL_CHAIN_ERROR = + APR_POLLHUP | APR_POLLNVAL | APR_POLLERR; + const apr_pollfd_t* poll = &(poll_fd[(*signal).second]); + if(poll->rtnevents & POLL_CHAIN_ERROR) { - process_this_chain = true; + // Potential eror condition has been + // returned. If HUP was one of them, we pass + // that as the error even though there may be + // more. If there are in fact more errors, + // we'll just wait for that detection until + // the next pump() cycle to catch it so that + // the logic here gets no more strained than + // it already is. + LLIOPipe::EStatus error_status; + if(poll->rtnevents & APR_POLLHUP) + error_status = LLIOPipe::STATUS_LOST_CONNECTION; + else + error_status = LLIOPipe::STATUS_ERROR; + if(handleChainError(*run_chain, error_status)) break; + ll_debug_poll_fd("Removing pipe", poll); + llwarns << "Removing pipe " + << (*run_chain).mChainLinks[0].mPipe + << " '" + << typeid( + *((*run_chain).mChainLinks[0].mPipe)).name() + << "' because: " + << events_2_string(poll->rtnevents) + << llendl; + (*run_chain).mHead = (*run_chain).mChainLinks.end(); break; } - //llinfos << "no fd ready for this one." << llendl; + + // at least 1 fd got signalled, and there were no + // errors. That means we process this chain. + process_this_chain = true; + break; } } } diff --git a/indra/llmessage/llpumpio.h b/indra/llmessage/llpumpio.h index 4d865a82ec..1609650f1f 100644 --- a/indra/llmessage/llpumpio.h +++ b/indra/llmessage/llpumpio.h @@ -424,6 +424,20 @@ protected: * @return Retuns true if someone handled the error */ bool handleChainError(LLChainInfo& chain, LLIOPipe::EStatus error); + +public: + /** + * @brief Return number of running chains. + * + * *NOTE: This is only used in debugging and not considered + * efficient or safe enough for production use. + */ + running_chains_t::size_type runningChains() const + { + return mRunningChains.size(); + } + + }; |