summaryrefslogtreecommitdiff
path: root/indra/llmessage
diff options
context:
space:
mode:
authorAaron Brashears <aaronb@lindenlab.com>2007-12-05 01:15:45 +0000
committerAaron Brashears <aaronb@lindenlab.com>2007-12-05 01:15:45 +0000
commit2a9be0445b82fdca0fb98da20f74c0200a9bffe1 (patch)
tree7dcc2d3727b33d950aa9ea4026a9c3873eabb5ad /indra/llmessage
parentf8511d77a70bea452cde7270b47044358e58427c (diff)
Result of svn merge -r74235:74242 svn+ssh://svn/svn/linden/branches/robust-pump into release
Diffstat (limited to 'indra/llmessage')
-rw-r--r--indra/llmessage/lliopipe.cpp1
-rw-r--r--indra/llmessage/lliopipe.h9
-rw-r--r--indra/llmessage/lliosocket.cpp38
-rw-r--r--indra/llmessage/llpumpio.cpp216
-rw-r--r--indra/llmessage/llpumpio.h14
5 files changed, 228 insertions, 50 deletions
diff --git a/indra/llmessage/lliopipe.cpp b/indra/llmessage/lliopipe.cpp
index 8c0d01f7fc..a0cf5183f1 100644
--- a/indra/llmessage/lliopipe.cpp
+++ b/indra/llmessage/lliopipe.cpp
@@ -51,6 +51,7 @@ static const std::string STATUS_ERROR_NAMES[LLIOPipe::STATUS_ERROR_COUNT] =
std::string("STATUS_NOT_IMPLEMENTED"),
std::string("STATUS_PRECONDITION_NOT_MET"),
std::string("STATUS_NO_CONNECTION"),
+ std::string("STATUS_LOST_CONNECTION"),
std::string("STATUS_EXPIRED"),
};
diff --git a/indra/llmessage/lliopipe.h b/indra/llmessage/lliopipe.h
index 47924eecc6..e480f83b55 100644
--- a/indra/llmessage/lliopipe.h
+++ b/indra/llmessage/lliopipe.h
@@ -148,11 +148,14 @@ public:
// This means we could not connect to a remote host.
STATUS_NO_CONNECTION = -4,
- // This means we could not connect to a remote host.
- STATUS_EXPIRED = -5,
+ // The connection was lost.
+ STATUS_LOST_CONNECTION = -5,
+
+ // The totoal process time has exceeded the timeout.
+ STATUS_EXPIRED = -6,
// Keep track of the count of codes here.
- STATUS_ERROR_COUNT = 5,
+ STATUS_ERROR_COUNT = 6,
};
/**
diff --git a/indra/llmessage/lliosocket.cpp b/indra/llmessage/lliosocket.cpp
index 920a5e4aa2..b1f55a297c 100644
--- a/indra/llmessage/lliosocket.cpp
+++ b/indra/llmessage/lliosocket.cpp
@@ -64,6 +64,40 @@ bool is_addr_in_use(apr_status_t status)
#endif
}
+#if LL_LINUX
+// Define this to see the actual file descriptors being tossed around.
+//#define LL_DEBUG_SOCKET_FILE_DESCRIPTORS 1
+#if LL_DEBUG_SOCKET_FILE_DESCRIPTORS
+#include "apr-1/apr_portable.h"
+#endif
+#endif
+
+
+// Quick function
+void ll_debug_socket(const char* msg, apr_socket_t* apr_sock)
+{
+#if LL_DEBUG_SOCKET_FILE_DESCRIPTORS
+ if(!apr_sock)
+ {
+ lldebugs << "Socket -- " << (msg?msg:"") << ": no socket." << llendl;
+ return;
+ }
+ // *TODO: Why doesn't this work?
+ //apr_os_sock_t os_sock;
+ int os_sock;
+ if(APR_SUCCESS == apr_os_sock_get(&os_sock, apr_sock))
+ {
+ lldebugs << "Socket -- " << (msg?msg:"") << " on fd " << os_sock
+ << " at " << apr_sock << llendl;
+ }
+ else
+ {
+ lldebugs << "Socket -- " << (msg?msg:"") << " no fd "
+ << " at " << apr_sock << llendl;
+ }
+#endif
+}
+
///
/// LLSocket
///
@@ -199,6 +233,7 @@ bool LLSocket::blockingConnect(const LLHost& host)
return false;
}
apr_socket_timeout_set(mSocket, 1000);
+ ll_debug_socket("Blocking connect", mSocket);
if(ll_apr_warn_status(apr_socket_connect(mSocket, sa))) return false;
setOptions();
return true;
@@ -209,6 +244,7 @@ LLSocket::LLSocket(apr_socket_t* socket, apr_pool_t* pool) :
mPool(pool),
mPort(PORT_INVALID)
{
+ ll_debug_socket("Constructing wholely formed socket", mSocket);
LLMemType m1(LLMemType::MTYPE_IO_TCP);
}
@@ -216,9 +252,9 @@ LLSocket::~LLSocket()
{
LLMemType m1(LLMemType::MTYPE_IO_TCP);
// *FIX: clean up memory we are holding.
- //lldebugs << "Destroying LLSocket" << llendl;
if(mSocket)
{
+ ll_debug_socket("Destroying socket", mSocket);
apr_socket_close(mSocket);
}
if(mPool)
diff --git a/indra/llmessage/llpumpio.cpp b/indra/llmessage/llpumpio.cpp
index ea7c06f38d..c3b298d020 100644
--- a/indra/llmessage/llpumpio.cpp
+++ b/indra/llmessage/llpumpio.cpp
@@ -34,6 +34,7 @@
#include "linden_common.h"
#include "llpumpio.h"
+#include <map>
#include <set>
#include "apr-1/apr_poll.h"
@@ -41,10 +42,15 @@
#include "llmemtype.h"
#include "llstl.h"
-// This should not be in production, but it is intensely useful during
-// development.
+// These should not be enabled in production, but they can be
+// intensely useful during development for finding certain kinds of
+// bugs.
#if LL_LINUX
-#define LL_DEBUG_PIPE_TYPE_IN_PUMP 0
+//#define LL_DEBUG_PIPE_TYPE_IN_PUMP 1
+//#define LL_DEBUG_POLL_FILE_DESCRIPTORS 1
+#if LL_DEBUG_POLL_FILE_DESCRIPTORS
+#include "apr-1/apr_portable.h"
+#endif
#endif
#if LL_DEBUG_PIPE_TYPE_IN_PUMP
@@ -73,6 +79,52 @@ extern const F32 NEVER_CHAIN_EXPIRY_SECS = 0.0f;
//#define LL_DEBUG_SPEW_BUFFER_CHANNEL_IN 1
//#define LL_DEBUG_SPEW_BUFFER_CHANNEL_OUT 1
+//
+// local functions
+//
+void ll_debug_poll_fd(const char* msg, const apr_pollfd_t* poll)
+{
+#if LL_DEBUG_POLL_FILE_DESCRIPTORS
+ if(!poll)
+ {
+ lldebugs << "Poll -- " << (msg?msg:"") << ": no pollfd." << llendl;
+ return;
+ }
+ if(poll->desc.s)
+ {
+ apr_os_sock_t os_sock;
+ if(APR_SUCCESS == apr_os_sock_get(&os_sock, poll->desc.s))
+ {
+ lldebugs << "Poll -- " << (msg?msg:"") << " on fd " << os_sock
+ << " at " << poll->desc.s << llendl;
+ }
+ else
+ {
+ lldebugs << "Poll -- " << (msg?msg:"") << " no fd "
+ << " at " << poll->desc.s << llendl;
+ }
+ }
+ else if(poll->desc.f)
+ {
+ apr_os_file_t os_file;
+ if(APR_SUCCESS == apr_os_file_get(&os_file, poll->desc.f))
+ {
+ lldebugs << "Poll -- " << (msg?msg:"") << " on fd " << os_file
+ << " at " << poll->desc.f << llendl;
+ }
+ else
+ {
+ lldebugs << "Poll -- " << (msg?msg:"") << " no fd "
+ << " at " << poll->desc.f << llendl;
+ }
+ }
+ else
+ {
+ lldebugs << "Poll -- " << (msg?msg:"") << ": no descriptor." << llendl;
+ }
+#endif
+}
+
/**
* @class
*/
@@ -217,50 +269,88 @@ bool LLPumpIO::setTimeoutSeconds(F32 timeout)
return true;
}
+static std::string events_2_string(apr_int16_t events)
+{
+ std::ostringstream ostr;
+ if(events & APR_POLLIN)
+ {
+ ostr << "read,";
+ }
+ if(events & APR_POLLPRI)
+ {
+ ostr << "priority,";
+ }
+ if(events & APR_POLLOUT)
+ {
+ ostr << "write,";
+ }
+ if(events & APR_POLLERR)
+ {
+ ostr << "error,";
+ }
+ if(events & APR_POLLHUP)
+ {
+ ostr << "hangup,";
+ }
+ if(events & APR_POLLNVAL)
+ {
+ ostr << "invalid,";
+ }
+ return chop_tail_copy(ostr.str(), 1);
+}
+
bool LLPumpIO::setConditional(LLIOPipe* pipe, const apr_pollfd_t* poll)
{
LLMemType m1(LLMemType::MTYPE_IO_PUMP);
- //lldebugs << "LLPumpIO::setConditional" << llendl;
- if(pipe)
+ if(!pipe) return false;
+ ll_debug_poll_fd("Set conditional", poll);
+
+ lldebugs << "Setting conditionals (" << events_2_string(poll->reqevents)
+ << ") "
+#if LL_DEBUG_PIPE_TYPE_IN_PUMP
+ << "on pipe " << typeid(*pipe).name()
+#endif
+ << " at " << pipe << llendl;
+
+ // remove any matching poll file descriptors for this pipe.
+ LLIOPipe::ptr_t pipe_ptr(pipe);
+ LLChainInfo::conditionals_t::iterator it;
+ it = (*mCurrentChain).mDescriptors.begin();
+ while(it != (*mCurrentChain).mDescriptors.end())
{
- // remove any matching poll file descriptors for this pipe.
- LLIOPipe::ptr_t pipe_ptr(pipe);
- LLChainInfo::conditionals_t::iterator it;
- it = (*mCurrentChain).mDescriptors.begin();
- while(it != (*mCurrentChain).mDescriptors.end())
+ LLChainInfo::pipe_conditional_t& value = (*it);
+ if(pipe_ptr == value.first)
{
- LLChainInfo::pipe_conditional_t& value = (*it);
- if(pipe_ptr == value.first)
- {
- ll_delete_apr_pollset_fd_client_data()(value);
- it = (*mCurrentChain).mDescriptors.erase(it);
- mRebuildPollset = true;
- }
- else
- {
- ++it;
- }
+ ll_delete_apr_pollset_fd_client_data()(value);
+ it = (*mCurrentChain).mDescriptors.erase(it);
+ mRebuildPollset = true;
}
-
- if(poll)
+ else
{
- LLChainInfo::pipe_conditional_t value;
- value.first = pipe_ptr;
- value.second = *poll;
- if(!poll->p)
- {
- // each fd needs a pool to work with, so if one was
- // not specified, use this pool.
- // *FIX: Should it always be this pool?
- value.second.p = mPool;
- }
- value.second.client_data = new S32(++mPollsetClientID);
- (*mCurrentChain).mDescriptors.push_back(value);
- mRebuildPollset = true;
+ ++it;
}
+ }
+
+ if(!poll)
+ {
+ mRebuildPollset = true;
return true;
}
- return false;
+ LLChainInfo::pipe_conditional_t value;
+ value.first = pipe_ptr;
+ value.second = *poll;
+ value.second.rtnevents = 0;
+ if(!poll->p)
+ {
+ // each fd needs a pool to work with, so if one was
+ // not specified, use this pool.
+ // *FIX: Should it always be this pool?
+ value.second.p = mPool;
+ }
+ value.second.client_data = new S32(++mPollsetClientID);
+ (*mCurrentChain).mDescriptors.push_back(value);
+ mRebuildPollset = true;
+ return true;
}
S32 LLPumpIO::setLock()
@@ -412,24 +502,25 @@ void LLPumpIO::pump(const S32& poll_timeout)
}
// Poll based on the last known pollset
- // *FIX: may want to pass in a poll timeout so it works correctly
+ // *TODO: may want to pass in a poll timeout so it works correctly
// in single and multi threaded processes.
PUMP_DEBUG;
- typedef std::set<S32> signal_client_t;
+ typedef std::map<S32, S32> signal_client_t;
signal_client_t signalled_client;
+ const apr_pollfd_t* poll_fd = NULL;
if(mPollset)
{
PUMP_DEBUG;
//llinfos << "polling" << llendl;
S32 count = 0;
S32 client_id = 0;
- const apr_pollfd_t* poll_fd = NULL;
apr_pollset_poll(mPollset, poll_timeout, &count, &poll_fd);
PUMP_DEBUG;
- for(S32 i = 0; i < count; ++i)
+ for(S32 ii = 0; ii < count; ++ii)
{
- client_id = *((S32*)poll_fd[i].client_data);
- signalled_client.insert(client_id);
+ ll_debug_poll_fd("Signalled pipe", &poll_fd[ii]);
+ client_id = *((S32*)poll_fd[ii].client_data);
+ signalled_client[client_id] = ii;
}
PUMP_DEBUG;
}
@@ -515,16 +606,49 @@ void LLPumpIO::pump(const S32& poll_timeout)
LLChainInfo::conditionals_t::iterator end;
end = (*run_chain).mDescriptors.end();
S32 client_id = 0;
+ signal_client_t::iterator signal;
for(; it != end; ++it)
{
PUMP_DEBUG;
client_id = *((S32*)((*it).second.client_data));
- if(signalled_client.find(client_id) != not_signalled)
+ signal = signalled_client.find(client_id);
+ if (signal == not_signalled) continue;
+ static const apr_int16_t POLL_CHAIN_ERROR =
+ APR_POLLHUP | APR_POLLNVAL | APR_POLLERR;
+ const apr_pollfd_t* poll = &(poll_fd[(*signal).second]);
+ if(poll->rtnevents & POLL_CHAIN_ERROR)
{
- process_this_chain = true;
+ // Potential eror condition has been
+ // returned. If HUP was one of them, we pass
+ // that as the error even though there may be
+ // more. If there are in fact more errors,
+ // we'll just wait for that detection until
+ // the next pump() cycle to catch it so that
+ // the logic here gets no more strained than
+ // it already is.
+ LLIOPipe::EStatus error_status;
+ if(poll->rtnevents & APR_POLLHUP)
+ error_status = LLIOPipe::STATUS_LOST_CONNECTION;
+ else
+ error_status = LLIOPipe::STATUS_ERROR;
+ if(handleChainError(*run_chain, error_status)) break;
+ ll_debug_poll_fd("Removing pipe", poll);
+ llwarns << "Removing pipe "
+ << (*run_chain).mChainLinks[0].mPipe
+ << " '"
+ << typeid(
+ *((*run_chain).mChainLinks[0].mPipe)).name()
+ << "' because: "
+ << events_2_string(poll->rtnevents)
+ << llendl;
+ (*run_chain).mHead = (*run_chain).mChainLinks.end();
break;
}
- //llinfos << "no fd ready for this one." << llendl;
+
+ // at least 1 fd got signalled, and there were no
+ // errors. That means we process this chain.
+ process_this_chain = true;
+ break;
}
}
}
diff --git a/indra/llmessage/llpumpio.h b/indra/llmessage/llpumpio.h
index 4d865a82ec..1609650f1f 100644
--- a/indra/llmessage/llpumpio.h
+++ b/indra/llmessage/llpumpio.h
@@ -424,6 +424,20 @@ protected:
* @return Retuns true if someone handled the error
*/
bool handleChainError(LLChainInfo& chain, LLIOPipe::EStatus error);
+
+public:
+ /**
+ * @brief Return number of running chains.
+ *
+ * *NOTE: This is only used in debugging and not considered
+ * efficient or safe enough for production use.
+ */
+ running_chains_t::size_type runningChains() const
+ {
+ return mRunningChains.size();
+ }
+
+
};