diff options
-rw-r--r-- | indra/llmessage/lliopipe.cpp | 1 | ||||
-rw-r--r-- | indra/llmessage/lliopipe.h | 9 | ||||
-rw-r--r-- | indra/llmessage/lliosocket.cpp | 38 | ||||
-rw-r--r-- | indra/llmessage/llpumpio.cpp | 216 | ||||
-rw-r--r-- | indra/llmessage/llpumpio.h | 14 | ||||
-rw-r--r-- | indra/test/io.cpp | 64 | ||||
-rw-r--r-- | indra/test/llpipeutil.cpp | 22 | ||||
-rw-r--r-- | indra/test/llpipeutil.h | 20 | ||||
-rw-r--r-- | indra/test/test.cpp | 9 |
9 files changed, 341 insertions, 52 deletions
diff --git a/indra/llmessage/lliopipe.cpp b/indra/llmessage/lliopipe.cpp index 8c0d01f7fc..a0cf5183f1 100644 --- a/indra/llmessage/lliopipe.cpp +++ b/indra/llmessage/lliopipe.cpp @@ -51,6 +51,7 @@ static const std::string STATUS_ERROR_NAMES[LLIOPipe::STATUS_ERROR_COUNT] = std::string("STATUS_NOT_IMPLEMENTED"), std::string("STATUS_PRECONDITION_NOT_MET"), std::string("STATUS_NO_CONNECTION"), + std::string("STATUS_LOST_CONNECTION"), std::string("STATUS_EXPIRED"), }; diff --git a/indra/llmessage/lliopipe.h b/indra/llmessage/lliopipe.h index 47924eecc6..e480f83b55 100644 --- a/indra/llmessage/lliopipe.h +++ b/indra/llmessage/lliopipe.h @@ -148,11 +148,14 @@ public: // This means we could not connect to a remote host. STATUS_NO_CONNECTION = -4, - // This means we could not connect to a remote host. - STATUS_EXPIRED = -5, + // The connection was lost. + STATUS_LOST_CONNECTION = -5, + + // The totoal process time has exceeded the timeout. + STATUS_EXPIRED = -6, // Keep track of the count of codes here. - STATUS_ERROR_COUNT = 5, + STATUS_ERROR_COUNT = 6, }; /** diff --git a/indra/llmessage/lliosocket.cpp b/indra/llmessage/lliosocket.cpp index 920a5e4aa2..b1f55a297c 100644 --- a/indra/llmessage/lliosocket.cpp +++ b/indra/llmessage/lliosocket.cpp @@ -64,6 +64,40 @@ bool is_addr_in_use(apr_status_t status) #endif } +#if LL_LINUX +// Define this to see the actual file descriptors being tossed around. +//#define LL_DEBUG_SOCKET_FILE_DESCRIPTORS 1 +#if LL_DEBUG_SOCKET_FILE_DESCRIPTORS +#include "apr-1/apr_portable.h" +#endif +#endif + + +// Quick function +void ll_debug_socket(const char* msg, apr_socket_t* apr_sock) +{ +#if LL_DEBUG_SOCKET_FILE_DESCRIPTORS + if(!apr_sock) + { + lldebugs << "Socket -- " << (msg?msg:"") << ": no socket." << llendl; + return; + } + // *TODO: Why doesn't this work? + //apr_os_sock_t os_sock; + int os_sock; + if(APR_SUCCESS == apr_os_sock_get(&os_sock, apr_sock)) + { + lldebugs << "Socket -- " << (msg?msg:"") << " on fd " << os_sock + << " at " << apr_sock << llendl; + } + else + { + lldebugs << "Socket -- " << (msg?msg:"") << " no fd " + << " at " << apr_sock << llendl; + } +#endif +} + /// /// LLSocket /// @@ -199,6 +233,7 @@ bool LLSocket::blockingConnect(const LLHost& host) return false; } apr_socket_timeout_set(mSocket, 1000); + ll_debug_socket("Blocking connect", mSocket); if(ll_apr_warn_status(apr_socket_connect(mSocket, sa))) return false; setOptions(); return true; @@ -209,6 +244,7 @@ LLSocket::LLSocket(apr_socket_t* socket, apr_pool_t* pool) : mPool(pool), mPort(PORT_INVALID) { + ll_debug_socket("Constructing wholely formed socket", mSocket); LLMemType m1(LLMemType::MTYPE_IO_TCP); } @@ -216,9 +252,9 @@ LLSocket::~LLSocket() { LLMemType m1(LLMemType::MTYPE_IO_TCP); // *FIX: clean up memory we are holding. - //lldebugs << "Destroying LLSocket" << llendl; if(mSocket) { + ll_debug_socket("Destroying socket", mSocket); apr_socket_close(mSocket); } if(mPool) diff --git a/indra/llmessage/llpumpio.cpp b/indra/llmessage/llpumpio.cpp index ea7c06f38d..c3b298d020 100644 --- a/indra/llmessage/llpumpio.cpp +++ b/indra/llmessage/llpumpio.cpp @@ -34,6 +34,7 @@ #include "linden_common.h" #include "llpumpio.h" +#include <map> #include <set> #include "apr-1/apr_poll.h" @@ -41,10 +42,15 @@ #include "llmemtype.h" #include "llstl.h" -// This should not be in production, but it is intensely useful during -// development. +// These should not be enabled in production, but they can be +// intensely useful during development for finding certain kinds of +// bugs. #if LL_LINUX -#define LL_DEBUG_PIPE_TYPE_IN_PUMP 0 +//#define LL_DEBUG_PIPE_TYPE_IN_PUMP 1 +//#define LL_DEBUG_POLL_FILE_DESCRIPTORS 1 +#if LL_DEBUG_POLL_FILE_DESCRIPTORS +#include "apr-1/apr_portable.h" +#endif #endif #if LL_DEBUG_PIPE_TYPE_IN_PUMP @@ -73,6 +79,52 @@ extern const F32 NEVER_CHAIN_EXPIRY_SECS = 0.0f; //#define LL_DEBUG_SPEW_BUFFER_CHANNEL_IN 1 //#define LL_DEBUG_SPEW_BUFFER_CHANNEL_OUT 1 +// +// local functions +// +void ll_debug_poll_fd(const char* msg, const apr_pollfd_t* poll) +{ +#if LL_DEBUG_POLL_FILE_DESCRIPTORS + if(!poll) + { + lldebugs << "Poll -- " << (msg?msg:"") << ": no pollfd." << llendl; + return; + } + if(poll->desc.s) + { + apr_os_sock_t os_sock; + if(APR_SUCCESS == apr_os_sock_get(&os_sock, poll->desc.s)) + { + lldebugs << "Poll -- " << (msg?msg:"") << " on fd " << os_sock + << " at " << poll->desc.s << llendl; + } + else + { + lldebugs << "Poll -- " << (msg?msg:"") << " no fd " + << " at " << poll->desc.s << llendl; + } + } + else if(poll->desc.f) + { + apr_os_file_t os_file; + if(APR_SUCCESS == apr_os_file_get(&os_file, poll->desc.f)) + { + lldebugs << "Poll -- " << (msg?msg:"") << " on fd " << os_file + << " at " << poll->desc.f << llendl; + } + else + { + lldebugs << "Poll -- " << (msg?msg:"") << " no fd " + << " at " << poll->desc.f << llendl; + } + } + else + { + lldebugs << "Poll -- " << (msg?msg:"") << ": no descriptor." << llendl; + } +#endif +} + /** * @class */ @@ -217,50 +269,88 @@ bool LLPumpIO::setTimeoutSeconds(F32 timeout) return true; } +static std::string events_2_string(apr_int16_t events) +{ + std::ostringstream ostr; + if(events & APR_POLLIN) + { + ostr << "read,"; + } + if(events & APR_POLLPRI) + { + ostr << "priority,"; + } + if(events & APR_POLLOUT) + { + ostr << "write,"; + } + if(events & APR_POLLERR) + { + ostr << "error,"; + } + if(events & APR_POLLHUP) + { + ostr << "hangup,"; + } + if(events & APR_POLLNVAL) + { + ostr << "invalid,"; + } + return chop_tail_copy(ostr.str(), 1); +} + bool LLPumpIO::setConditional(LLIOPipe* pipe, const apr_pollfd_t* poll) { LLMemType m1(LLMemType::MTYPE_IO_PUMP); - //lldebugs << "LLPumpIO::setConditional" << llendl; - if(pipe) + if(!pipe) return false; + ll_debug_poll_fd("Set conditional", poll); + + lldebugs << "Setting conditionals (" << events_2_string(poll->reqevents) + << ") " +#if LL_DEBUG_PIPE_TYPE_IN_PUMP + << "on pipe " << typeid(*pipe).name() +#endif + << " at " << pipe << llendl; + + // remove any matching poll file descriptors for this pipe. + LLIOPipe::ptr_t pipe_ptr(pipe); + LLChainInfo::conditionals_t::iterator it; + it = (*mCurrentChain).mDescriptors.begin(); + while(it != (*mCurrentChain).mDescriptors.end()) { - // remove any matching poll file descriptors for this pipe. - LLIOPipe::ptr_t pipe_ptr(pipe); - LLChainInfo::conditionals_t::iterator it; - it = (*mCurrentChain).mDescriptors.begin(); - while(it != (*mCurrentChain).mDescriptors.end()) + LLChainInfo::pipe_conditional_t& value = (*it); + if(pipe_ptr == value.first) { - LLChainInfo::pipe_conditional_t& value = (*it); - if(pipe_ptr == value.first) - { - ll_delete_apr_pollset_fd_client_data()(value); - it = (*mCurrentChain).mDescriptors.erase(it); - mRebuildPollset = true; - } - else - { - ++it; - } + ll_delete_apr_pollset_fd_client_data()(value); + it = (*mCurrentChain).mDescriptors.erase(it); + mRebuildPollset = true; } - - if(poll) + else { - LLChainInfo::pipe_conditional_t value; - value.first = pipe_ptr; - value.second = *poll; - if(!poll->p) - { - // each fd needs a pool to work with, so if one was - // not specified, use this pool. - // *FIX: Should it always be this pool? - value.second.p = mPool; - } - value.second.client_data = new S32(++mPollsetClientID); - (*mCurrentChain).mDescriptors.push_back(value); - mRebuildPollset = true; + ++it; } + } + + if(!poll) + { + mRebuildPollset = true; return true; } - return false; + LLChainInfo::pipe_conditional_t value; + value.first = pipe_ptr; + value.second = *poll; + value.second.rtnevents = 0; + if(!poll->p) + { + // each fd needs a pool to work with, so if one was + // not specified, use this pool. + // *FIX: Should it always be this pool? + value.second.p = mPool; + } + value.second.client_data = new S32(++mPollsetClientID); + (*mCurrentChain).mDescriptors.push_back(value); + mRebuildPollset = true; + return true; } S32 LLPumpIO::setLock() @@ -412,24 +502,25 @@ void LLPumpIO::pump(const S32& poll_timeout) } // Poll based on the last known pollset - // *FIX: may want to pass in a poll timeout so it works correctly + // *TODO: may want to pass in a poll timeout so it works correctly // in single and multi threaded processes. PUMP_DEBUG; - typedef std::set<S32> signal_client_t; + typedef std::map<S32, S32> signal_client_t; signal_client_t signalled_client; + const apr_pollfd_t* poll_fd = NULL; if(mPollset) { PUMP_DEBUG; //llinfos << "polling" << llendl; S32 count = 0; S32 client_id = 0; - const apr_pollfd_t* poll_fd = NULL; apr_pollset_poll(mPollset, poll_timeout, &count, &poll_fd); PUMP_DEBUG; - for(S32 i = 0; i < count; ++i) + for(S32 ii = 0; ii < count; ++ii) { - client_id = *((S32*)poll_fd[i].client_data); - signalled_client.insert(client_id); + ll_debug_poll_fd("Signalled pipe", &poll_fd[ii]); + client_id = *((S32*)poll_fd[ii].client_data); + signalled_client[client_id] = ii; } PUMP_DEBUG; } @@ -515,16 +606,49 @@ void LLPumpIO::pump(const S32& poll_timeout) LLChainInfo::conditionals_t::iterator end; end = (*run_chain).mDescriptors.end(); S32 client_id = 0; + signal_client_t::iterator signal; for(; it != end; ++it) { PUMP_DEBUG; client_id = *((S32*)((*it).second.client_data)); - if(signalled_client.find(client_id) != not_signalled) + signal = signalled_client.find(client_id); + if (signal == not_signalled) continue; + static const apr_int16_t POLL_CHAIN_ERROR = + APR_POLLHUP | APR_POLLNVAL | APR_POLLERR; + const apr_pollfd_t* poll = &(poll_fd[(*signal).second]); + if(poll->rtnevents & POLL_CHAIN_ERROR) { - process_this_chain = true; + // Potential eror condition has been + // returned. If HUP was one of them, we pass + // that as the error even though there may be + // more. If there are in fact more errors, + // we'll just wait for that detection until + // the next pump() cycle to catch it so that + // the logic here gets no more strained than + // it already is. + LLIOPipe::EStatus error_status; + if(poll->rtnevents & APR_POLLHUP) + error_status = LLIOPipe::STATUS_LOST_CONNECTION; + else + error_status = LLIOPipe::STATUS_ERROR; + if(handleChainError(*run_chain, error_status)) break; + ll_debug_poll_fd("Removing pipe", poll); + llwarns << "Removing pipe " + << (*run_chain).mChainLinks[0].mPipe + << " '" + << typeid( + *((*run_chain).mChainLinks[0].mPipe)).name() + << "' because: " + << events_2_string(poll->rtnevents) + << llendl; + (*run_chain).mHead = (*run_chain).mChainLinks.end(); break; } - //llinfos << "no fd ready for this one." << llendl; + + // at least 1 fd got signalled, and there were no + // errors. That means we process this chain. + process_this_chain = true; + break; } } } diff --git a/indra/llmessage/llpumpio.h b/indra/llmessage/llpumpio.h index 4d865a82ec..1609650f1f 100644 --- a/indra/llmessage/llpumpio.h +++ b/indra/llmessage/llpumpio.h @@ -424,6 +424,20 @@ protected: * @return Retuns true if someone handled the error */ bool handleChainError(LLChainInfo& chain, LLIOPipe::EStatus error); + +public: + /** + * @brief Return number of running chains. + * + * *NOTE: This is only used in debugging and not considered + * efficient or safe enough for production use. + */ + running_chains_t::size_type runningChains() const + { + return mRunningChains.size(); + } + + }; diff --git a/indra/test/io.cpp b/indra/test/io.cpp index 363f375014..350fc5394b 100644 --- a/indra/test/io.cpp +++ b/indra/test/io.cpp @@ -1080,7 +1080,7 @@ namespace tut mPool, mSocket, factory); - server->setResponseTimeout(SHORT_CHAIN_EXPIRY_SECS + 2.0f); + server->setResponseTimeout(SHORT_CHAIN_EXPIRY_SECS + 1.80f); chain.push_back(LLIOPipe::ptr_t(server)); mPump->addChain(chain, NEVER_CHAIN_EXPIRY_SECS); @@ -1108,6 +1108,68 @@ namespace tut F32 elapsed = pump_loop(mPump, SHORT_CHAIN_EXPIRY_SECS + 3.0f); ensure("Did not take too long", (elapsed < DEFAULT_CHAIN_EXPIRY_SECS)); } + + template<> template<> + void fitness_test_object::test<5>() + { + // Set up the server + LLPumpIO::chain_t chain; + typedef LLCloneIOFactory<LLIOSleeper> sleeper_t; + sleeper_t* sleeper = new sleeper_t(new LLIOSleeper); + boost::shared_ptr<LLChainIOFactory> factory(sleeper); + LLIOServerSocket* server = new LLIOServerSocket( + mPool, + mSocket, + factory); + server->setResponseTimeout(1.0); + chain.push_back(LLIOPipe::ptr_t(server)); + mPump->addChain(chain, NEVER_CHAIN_EXPIRY_SECS); + // We need to tickle the pump a little to set up the listen() + pump_loop(mPump, 0.1f); + U32 count = mPump->runningChains(); + ensure_equals("server chain onboard", count, 1); + lldebugs << "** Server is up." << llendl; + + // Set up the client + LLSocket::ptr_t client = LLSocket::create(mPool, LLSocket::STREAM_TCP); + LLHost server_host("127.0.0.1", SERVER_LISTEN_PORT); + bool connected = client->blockingConnect(server_host); + ensure("Connected to server", connected); + lldebugs << "connected" << llendl; + F32 elapsed = pump_loop(mPump,0.1f); + count = mPump->runningChains(); + ensure_equals("server chain onboard", count, 2); + lldebugs << "** Client is connected." << llendl; + + // We have connected, since the socket reader does not block, + // the first call to read data will return EAGAIN, so we need + // to write something. + chain.clear(); + chain.push_back(LLIOPipe::ptr_t(new LLPipeStringInjector("hi"))); + chain.push_back(LLIOPipe::ptr_t(new LLIOSocketWriter(client))); + chain.push_back(LLIOPipe::ptr_t(new LLIONull)); + mPump->addChain(chain, 0.2); + chain.clear(); + + // pump for a bit and make sure all 3 chains are running + elapsed = pump_loop(mPump,0.1f); + count = mPump->runningChains(); + ensure_equals("client chain onboard", count, 3); + lldebugs << "** request should have been sent." << llendl; + + // pump for long enough the the client socket closes, and the + // server socket should not be closed yet. + elapsed = pump_loop(mPump,0.2f); + count = mPump->runningChains(); + ensure_equals("client chain timed out ", count, 2); + lldebugs << "** client chain should be closed." << llendl; + + // At this point, the socket should be closed by the timeout + elapsed = pump_loop(mPump,1.0f); + count = mPump->runningChains(); + ensure_equals("accepted socked close", count, 1); + lldebugs << "** Sleeper should have timed out.." << llendl; + } } namespace tut diff --git a/indra/test/llpipeutil.cpp b/indra/test/llpipeutil.cpp index b7b9122615..c9c1eeb8b4 100644 --- a/indra/test/llpipeutil.cpp +++ b/indra/test/llpipeutil.cpp @@ -164,3 +164,25 @@ LLIOPipe::EStatus LLIONull::process_impl( { return STATUS_OK; } + +// virtual +LLIOPipe::EStatus LLIOSleeper::process_impl( + const LLChannelDescriptors& channels, + buffer_ptr_t& buffer, + bool& eos, + LLSD& context, + LLPumpIO* pump) +{ + if(!mRespond) + { + lldebugs << "LLIOSleeper::process_impl() sleeping." << llendl; + mRespond = true; + static const F64 SLEEP_TIME = 2.0; + pump->sleepChain(SLEEP_TIME); + return STATUS_BREAK; + } + lldebugs << "LLIOSleeper::process_impl() responding." << llendl; + LLBufferStream ostr(channels, buffer.get()); + ostr << "huh? sorry, I was sleeping." << std::endl; + return STATUS_DONE; +} diff --git a/indra/test/llpipeutil.h b/indra/test/llpipeutil.h index 25311780ac..a52f141d55 100644 --- a/indra/test/llpipeutil.h +++ b/indra/test/llpipeutil.h @@ -145,4 +145,24 @@ protected: LLPumpIO* pump); }; +/** + * @brief Pipe that sleeps, and then responds later. + */ +class LLIOSleeper : public LLIOPipe +{ +public: + LLIOSleeper() : mRespond(false) {} + +protected: + virtual EStatus process_impl( + const LLChannelDescriptors& channels, + buffer_ptr_t& buffer, + bool& eos, + LLSD& context, + LLPumpIO* pump); +private: + bool mRespond; + +}; + #endif // LL_LLPIPEUTIL_H diff --git a/indra/test/test.cpp b/indra/test/test.cpp index fc8f8d9711..f573d53ba8 100644 --- a/indra/test/test.cpp +++ b/indra/test/test.cpp @@ -170,6 +170,7 @@ static const apr_getopt_option_t TEST_CL_OPTIONS[] = {"group", 'g', 1, "Run test group specified by option argument."}, {"skip", 's', 1, "Skip test number specified by option argument. Only works when a specific group is being tested"}, {"wait", 'w', 0, "Wait for input before exit."}, + {"debug", 'd', 0, "Emit full debug logs."}, {0, 0, 0, 0} }; @@ -224,7 +225,8 @@ int main(int argc, char **argv) LLError::initForApplication("."); LLError::setFatalFunction(wouldHaveCrashed); LLError::setDefaultLevel(LLError::LEVEL_ERROR); - // *FIX: should come from error config file + //< *TODO: should come from error config file. Note that we + // have a command line option that sets this to debug. #ifdef CTYPE_WORKAROUND ctype_workaround(); @@ -286,6 +288,11 @@ int main(int argc, char **argv) case 'w': wait_at_exit = true; break; + case 'd': + // *TODO: should come from error config file. We set it to + // ERROR by default, so this allows full debug levels. + LLError::setDefaultLevel(LLError::LEVEL_DEBUG); + break; default: stream_usage(std::cerr, argv[0]); return 1; |