summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--indra/llmessage/lliopipe.cpp1
-rw-r--r--indra/llmessage/lliopipe.h9
-rw-r--r--indra/llmessage/lliosocket.cpp38
-rw-r--r--indra/llmessage/llpumpio.cpp216
-rw-r--r--indra/llmessage/llpumpio.h14
-rw-r--r--indra/test/io.cpp64
-rw-r--r--indra/test/llpipeutil.cpp22
-rw-r--r--indra/test/llpipeutil.h20
-rw-r--r--indra/test/test.cpp9
9 files changed, 341 insertions, 52 deletions
diff --git a/indra/llmessage/lliopipe.cpp b/indra/llmessage/lliopipe.cpp
index 8c0d01f7fc..a0cf5183f1 100644
--- a/indra/llmessage/lliopipe.cpp
+++ b/indra/llmessage/lliopipe.cpp
@@ -51,6 +51,7 @@ static const std::string STATUS_ERROR_NAMES[LLIOPipe::STATUS_ERROR_COUNT] =
std::string("STATUS_NOT_IMPLEMENTED"),
std::string("STATUS_PRECONDITION_NOT_MET"),
std::string("STATUS_NO_CONNECTION"),
+ std::string("STATUS_LOST_CONNECTION"),
std::string("STATUS_EXPIRED"),
};
diff --git a/indra/llmessage/lliopipe.h b/indra/llmessage/lliopipe.h
index 47924eecc6..e480f83b55 100644
--- a/indra/llmessage/lliopipe.h
+++ b/indra/llmessage/lliopipe.h
@@ -148,11 +148,14 @@ public:
// This means we could not connect to a remote host.
STATUS_NO_CONNECTION = -4,
- // This means we could not connect to a remote host.
- STATUS_EXPIRED = -5,
+ // The connection was lost.
+ STATUS_LOST_CONNECTION = -5,
+
+ // The totoal process time has exceeded the timeout.
+ STATUS_EXPIRED = -6,
// Keep track of the count of codes here.
- STATUS_ERROR_COUNT = 5,
+ STATUS_ERROR_COUNT = 6,
};
/**
diff --git a/indra/llmessage/lliosocket.cpp b/indra/llmessage/lliosocket.cpp
index 920a5e4aa2..b1f55a297c 100644
--- a/indra/llmessage/lliosocket.cpp
+++ b/indra/llmessage/lliosocket.cpp
@@ -64,6 +64,40 @@ bool is_addr_in_use(apr_status_t status)
#endif
}
+#if LL_LINUX
+// Define this to see the actual file descriptors being tossed around.
+//#define LL_DEBUG_SOCKET_FILE_DESCRIPTORS 1
+#if LL_DEBUG_SOCKET_FILE_DESCRIPTORS
+#include "apr-1/apr_portable.h"
+#endif
+#endif
+
+
+// Quick function
+void ll_debug_socket(const char* msg, apr_socket_t* apr_sock)
+{
+#if LL_DEBUG_SOCKET_FILE_DESCRIPTORS
+ if(!apr_sock)
+ {
+ lldebugs << "Socket -- " << (msg?msg:"") << ": no socket." << llendl;
+ return;
+ }
+ // *TODO: Why doesn't this work?
+ //apr_os_sock_t os_sock;
+ int os_sock;
+ if(APR_SUCCESS == apr_os_sock_get(&os_sock, apr_sock))
+ {
+ lldebugs << "Socket -- " << (msg?msg:"") << " on fd " << os_sock
+ << " at " << apr_sock << llendl;
+ }
+ else
+ {
+ lldebugs << "Socket -- " << (msg?msg:"") << " no fd "
+ << " at " << apr_sock << llendl;
+ }
+#endif
+}
+
///
/// LLSocket
///
@@ -199,6 +233,7 @@ bool LLSocket::blockingConnect(const LLHost& host)
return false;
}
apr_socket_timeout_set(mSocket, 1000);
+ ll_debug_socket("Blocking connect", mSocket);
if(ll_apr_warn_status(apr_socket_connect(mSocket, sa))) return false;
setOptions();
return true;
@@ -209,6 +244,7 @@ LLSocket::LLSocket(apr_socket_t* socket, apr_pool_t* pool) :
mPool(pool),
mPort(PORT_INVALID)
{
+ ll_debug_socket("Constructing wholely formed socket", mSocket);
LLMemType m1(LLMemType::MTYPE_IO_TCP);
}
@@ -216,9 +252,9 @@ LLSocket::~LLSocket()
{
LLMemType m1(LLMemType::MTYPE_IO_TCP);
// *FIX: clean up memory we are holding.
- //lldebugs << "Destroying LLSocket" << llendl;
if(mSocket)
{
+ ll_debug_socket("Destroying socket", mSocket);
apr_socket_close(mSocket);
}
if(mPool)
diff --git a/indra/llmessage/llpumpio.cpp b/indra/llmessage/llpumpio.cpp
index ea7c06f38d..c3b298d020 100644
--- a/indra/llmessage/llpumpio.cpp
+++ b/indra/llmessage/llpumpio.cpp
@@ -34,6 +34,7 @@
#include "linden_common.h"
#include "llpumpio.h"
+#include <map>
#include <set>
#include "apr-1/apr_poll.h"
@@ -41,10 +42,15 @@
#include "llmemtype.h"
#include "llstl.h"
-// This should not be in production, but it is intensely useful during
-// development.
+// These should not be enabled in production, but they can be
+// intensely useful during development for finding certain kinds of
+// bugs.
#if LL_LINUX
-#define LL_DEBUG_PIPE_TYPE_IN_PUMP 0
+//#define LL_DEBUG_PIPE_TYPE_IN_PUMP 1
+//#define LL_DEBUG_POLL_FILE_DESCRIPTORS 1
+#if LL_DEBUG_POLL_FILE_DESCRIPTORS
+#include "apr-1/apr_portable.h"
+#endif
#endif
#if LL_DEBUG_PIPE_TYPE_IN_PUMP
@@ -73,6 +79,52 @@ extern const F32 NEVER_CHAIN_EXPIRY_SECS = 0.0f;
//#define LL_DEBUG_SPEW_BUFFER_CHANNEL_IN 1
//#define LL_DEBUG_SPEW_BUFFER_CHANNEL_OUT 1
+//
+// local functions
+//
+void ll_debug_poll_fd(const char* msg, const apr_pollfd_t* poll)
+{
+#if LL_DEBUG_POLL_FILE_DESCRIPTORS
+ if(!poll)
+ {
+ lldebugs << "Poll -- " << (msg?msg:"") << ": no pollfd." << llendl;
+ return;
+ }
+ if(poll->desc.s)
+ {
+ apr_os_sock_t os_sock;
+ if(APR_SUCCESS == apr_os_sock_get(&os_sock, poll->desc.s))
+ {
+ lldebugs << "Poll -- " << (msg?msg:"") << " on fd " << os_sock
+ << " at " << poll->desc.s << llendl;
+ }
+ else
+ {
+ lldebugs << "Poll -- " << (msg?msg:"") << " no fd "
+ << " at " << poll->desc.s << llendl;
+ }
+ }
+ else if(poll->desc.f)
+ {
+ apr_os_file_t os_file;
+ if(APR_SUCCESS == apr_os_file_get(&os_file, poll->desc.f))
+ {
+ lldebugs << "Poll -- " << (msg?msg:"") << " on fd " << os_file
+ << " at " << poll->desc.f << llendl;
+ }
+ else
+ {
+ lldebugs << "Poll -- " << (msg?msg:"") << " no fd "
+ << " at " << poll->desc.f << llendl;
+ }
+ }
+ else
+ {
+ lldebugs << "Poll -- " << (msg?msg:"") << ": no descriptor." << llendl;
+ }
+#endif
+}
+
/**
* @class
*/
@@ -217,50 +269,88 @@ bool LLPumpIO::setTimeoutSeconds(F32 timeout)
return true;
}
+static std::string events_2_string(apr_int16_t events)
+{
+ std::ostringstream ostr;
+ if(events & APR_POLLIN)
+ {
+ ostr << "read,";
+ }
+ if(events & APR_POLLPRI)
+ {
+ ostr << "priority,";
+ }
+ if(events & APR_POLLOUT)
+ {
+ ostr << "write,";
+ }
+ if(events & APR_POLLERR)
+ {
+ ostr << "error,";
+ }
+ if(events & APR_POLLHUP)
+ {
+ ostr << "hangup,";
+ }
+ if(events & APR_POLLNVAL)
+ {
+ ostr << "invalid,";
+ }
+ return chop_tail_copy(ostr.str(), 1);
+}
+
bool LLPumpIO::setConditional(LLIOPipe* pipe, const apr_pollfd_t* poll)
{
LLMemType m1(LLMemType::MTYPE_IO_PUMP);
- //lldebugs << "LLPumpIO::setConditional" << llendl;
- if(pipe)
+ if(!pipe) return false;
+ ll_debug_poll_fd("Set conditional", poll);
+
+ lldebugs << "Setting conditionals (" << events_2_string(poll->reqevents)
+ << ") "
+#if LL_DEBUG_PIPE_TYPE_IN_PUMP
+ << "on pipe " << typeid(*pipe).name()
+#endif
+ << " at " << pipe << llendl;
+
+ // remove any matching poll file descriptors for this pipe.
+ LLIOPipe::ptr_t pipe_ptr(pipe);
+ LLChainInfo::conditionals_t::iterator it;
+ it = (*mCurrentChain).mDescriptors.begin();
+ while(it != (*mCurrentChain).mDescriptors.end())
{
- // remove any matching poll file descriptors for this pipe.
- LLIOPipe::ptr_t pipe_ptr(pipe);
- LLChainInfo::conditionals_t::iterator it;
- it = (*mCurrentChain).mDescriptors.begin();
- while(it != (*mCurrentChain).mDescriptors.end())
+ LLChainInfo::pipe_conditional_t& value = (*it);
+ if(pipe_ptr == value.first)
{
- LLChainInfo::pipe_conditional_t& value = (*it);
- if(pipe_ptr == value.first)
- {
- ll_delete_apr_pollset_fd_client_data()(value);
- it = (*mCurrentChain).mDescriptors.erase(it);
- mRebuildPollset = true;
- }
- else
- {
- ++it;
- }
+ ll_delete_apr_pollset_fd_client_data()(value);
+ it = (*mCurrentChain).mDescriptors.erase(it);
+ mRebuildPollset = true;
}
-
- if(poll)
+ else
{
- LLChainInfo::pipe_conditional_t value;
- value.first = pipe_ptr;
- value.second = *poll;
- if(!poll->p)
- {
- // each fd needs a pool to work with, so if one was
- // not specified, use this pool.
- // *FIX: Should it always be this pool?
- value.second.p = mPool;
- }
- value.second.client_data = new S32(++mPollsetClientID);
- (*mCurrentChain).mDescriptors.push_back(value);
- mRebuildPollset = true;
+ ++it;
}
+ }
+
+ if(!poll)
+ {
+ mRebuildPollset = true;
return true;
}
- return false;
+ LLChainInfo::pipe_conditional_t value;
+ value.first = pipe_ptr;
+ value.second = *poll;
+ value.second.rtnevents = 0;
+ if(!poll->p)
+ {
+ // each fd needs a pool to work with, so if one was
+ // not specified, use this pool.
+ // *FIX: Should it always be this pool?
+ value.second.p = mPool;
+ }
+ value.second.client_data = new S32(++mPollsetClientID);
+ (*mCurrentChain).mDescriptors.push_back(value);
+ mRebuildPollset = true;
+ return true;
}
S32 LLPumpIO::setLock()
@@ -412,24 +502,25 @@ void LLPumpIO::pump(const S32& poll_timeout)
}
// Poll based on the last known pollset
- // *FIX: may want to pass in a poll timeout so it works correctly
+ // *TODO: may want to pass in a poll timeout so it works correctly
// in single and multi threaded processes.
PUMP_DEBUG;
- typedef std::set<S32> signal_client_t;
+ typedef std::map<S32, S32> signal_client_t;
signal_client_t signalled_client;
+ const apr_pollfd_t* poll_fd = NULL;
if(mPollset)
{
PUMP_DEBUG;
//llinfos << "polling" << llendl;
S32 count = 0;
S32 client_id = 0;
- const apr_pollfd_t* poll_fd = NULL;
apr_pollset_poll(mPollset, poll_timeout, &count, &poll_fd);
PUMP_DEBUG;
- for(S32 i = 0; i < count; ++i)
+ for(S32 ii = 0; ii < count; ++ii)
{
- client_id = *((S32*)poll_fd[i].client_data);
- signalled_client.insert(client_id);
+ ll_debug_poll_fd("Signalled pipe", &poll_fd[ii]);
+ client_id = *((S32*)poll_fd[ii].client_data);
+ signalled_client[client_id] = ii;
}
PUMP_DEBUG;
}
@@ -515,16 +606,49 @@ void LLPumpIO::pump(const S32& poll_timeout)
LLChainInfo::conditionals_t::iterator end;
end = (*run_chain).mDescriptors.end();
S32 client_id = 0;
+ signal_client_t::iterator signal;
for(; it != end; ++it)
{
PUMP_DEBUG;
client_id = *((S32*)((*it).second.client_data));
- if(signalled_client.find(client_id) != not_signalled)
+ signal = signalled_client.find(client_id);
+ if (signal == not_signalled) continue;
+ static const apr_int16_t POLL_CHAIN_ERROR =
+ APR_POLLHUP | APR_POLLNVAL | APR_POLLERR;
+ const apr_pollfd_t* poll = &(poll_fd[(*signal).second]);
+ if(poll->rtnevents & POLL_CHAIN_ERROR)
{
- process_this_chain = true;
+ // Potential eror condition has been
+ // returned. If HUP was one of them, we pass
+ // that as the error even though there may be
+ // more. If there are in fact more errors,
+ // we'll just wait for that detection until
+ // the next pump() cycle to catch it so that
+ // the logic here gets no more strained than
+ // it already is.
+ LLIOPipe::EStatus error_status;
+ if(poll->rtnevents & APR_POLLHUP)
+ error_status = LLIOPipe::STATUS_LOST_CONNECTION;
+ else
+ error_status = LLIOPipe::STATUS_ERROR;
+ if(handleChainError(*run_chain, error_status)) break;
+ ll_debug_poll_fd("Removing pipe", poll);
+ llwarns << "Removing pipe "
+ << (*run_chain).mChainLinks[0].mPipe
+ << " '"
+ << typeid(
+ *((*run_chain).mChainLinks[0].mPipe)).name()
+ << "' because: "
+ << events_2_string(poll->rtnevents)
+ << llendl;
+ (*run_chain).mHead = (*run_chain).mChainLinks.end();
break;
}
- //llinfos << "no fd ready for this one." << llendl;
+
+ // at least 1 fd got signalled, and there were no
+ // errors. That means we process this chain.
+ process_this_chain = true;
+ break;
}
}
}
diff --git a/indra/llmessage/llpumpio.h b/indra/llmessage/llpumpio.h
index 4d865a82ec..1609650f1f 100644
--- a/indra/llmessage/llpumpio.h
+++ b/indra/llmessage/llpumpio.h
@@ -424,6 +424,20 @@ protected:
* @return Retuns true if someone handled the error
*/
bool handleChainError(LLChainInfo& chain, LLIOPipe::EStatus error);
+
+public:
+ /**
+ * @brief Return number of running chains.
+ *
+ * *NOTE: This is only used in debugging and not considered
+ * efficient or safe enough for production use.
+ */
+ running_chains_t::size_type runningChains() const
+ {
+ return mRunningChains.size();
+ }
+
+
};
diff --git a/indra/test/io.cpp b/indra/test/io.cpp
index 363f375014..350fc5394b 100644
--- a/indra/test/io.cpp
+++ b/indra/test/io.cpp
@@ -1080,7 +1080,7 @@ namespace tut
mPool,
mSocket,
factory);
- server->setResponseTimeout(SHORT_CHAIN_EXPIRY_SECS + 2.0f);
+ server->setResponseTimeout(SHORT_CHAIN_EXPIRY_SECS + 1.80f);
chain.push_back(LLIOPipe::ptr_t(server));
mPump->addChain(chain, NEVER_CHAIN_EXPIRY_SECS);
@@ -1108,6 +1108,68 @@ namespace tut
F32 elapsed = pump_loop(mPump, SHORT_CHAIN_EXPIRY_SECS + 3.0f);
ensure("Did not take too long", (elapsed < DEFAULT_CHAIN_EXPIRY_SECS));
}
+
+ template<> template<>
+ void fitness_test_object::test<5>()
+ {
+ // Set up the server
+ LLPumpIO::chain_t chain;
+ typedef LLCloneIOFactory<LLIOSleeper> sleeper_t;
+ sleeper_t* sleeper = new sleeper_t(new LLIOSleeper);
+ boost::shared_ptr<LLChainIOFactory> factory(sleeper);
+ LLIOServerSocket* server = new LLIOServerSocket(
+ mPool,
+ mSocket,
+ factory);
+ server->setResponseTimeout(1.0);
+ chain.push_back(LLIOPipe::ptr_t(server));
+ mPump->addChain(chain, NEVER_CHAIN_EXPIRY_SECS);
+ // We need to tickle the pump a little to set up the listen()
+ pump_loop(mPump, 0.1f);
+ U32 count = mPump->runningChains();
+ ensure_equals("server chain onboard", count, 1);
+ lldebugs << "** Server is up." << llendl;
+
+ // Set up the client
+ LLSocket::ptr_t client = LLSocket::create(mPool, LLSocket::STREAM_TCP);
+ LLHost server_host("127.0.0.1", SERVER_LISTEN_PORT);
+ bool connected = client->blockingConnect(server_host);
+ ensure("Connected to server", connected);
+ lldebugs << "connected" << llendl;
+ F32 elapsed = pump_loop(mPump,0.1f);
+ count = mPump->runningChains();
+ ensure_equals("server chain onboard", count, 2);
+ lldebugs << "** Client is connected." << llendl;
+
+ // We have connected, since the socket reader does not block,
+ // the first call to read data will return EAGAIN, so we need
+ // to write something.
+ chain.clear();
+ chain.push_back(LLIOPipe::ptr_t(new LLPipeStringInjector("hi")));
+ chain.push_back(LLIOPipe::ptr_t(new LLIOSocketWriter(client)));
+ chain.push_back(LLIOPipe::ptr_t(new LLIONull));
+ mPump->addChain(chain, 0.2);
+ chain.clear();
+
+ // pump for a bit and make sure all 3 chains are running
+ elapsed = pump_loop(mPump,0.1f);
+ count = mPump->runningChains();
+ ensure_equals("client chain onboard", count, 3);
+ lldebugs << "** request should have been sent." << llendl;
+
+ // pump for long enough the the client socket closes, and the
+ // server socket should not be closed yet.
+ elapsed = pump_loop(mPump,0.2f);
+ count = mPump->runningChains();
+ ensure_equals("client chain timed out ", count, 2);
+ lldebugs << "** client chain should be closed." << llendl;
+
+ // At this point, the socket should be closed by the timeout
+ elapsed = pump_loop(mPump,1.0f);
+ count = mPump->runningChains();
+ ensure_equals("accepted socked close", count, 1);
+ lldebugs << "** Sleeper should have timed out.." << llendl;
+ }
}
namespace tut
diff --git a/indra/test/llpipeutil.cpp b/indra/test/llpipeutil.cpp
index b7b9122615..c9c1eeb8b4 100644
--- a/indra/test/llpipeutil.cpp
+++ b/indra/test/llpipeutil.cpp
@@ -164,3 +164,25 @@ LLIOPipe::EStatus LLIONull::process_impl(
{
return STATUS_OK;
}
+
+// virtual
+LLIOPipe::EStatus LLIOSleeper::process_impl(
+ const LLChannelDescriptors& channels,
+ buffer_ptr_t& buffer,
+ bool& eos,
+ LLSD& context,
+ LLPumpIO* pump)
+{
+ if(!mRespond)
+ {
+ lldebugs << "LLIOSleeper::process_impl() sleeping." << llendl;
+ mRespond = true;
+ static const F64 SLEEP_TIME = 2.0;
+ pump->sleepChain(SLEEP_TIME);
+ return STATUS_BREAK;
+ }
+ lldebugs << "LLIOSleeper::process_impl() responding." << llendl;
+ LLBufferStream ostr(channels, buffer.get());
+ ostr << "huh? sorry, I was sleeping." << std::endl;
+ return STATUS_DONE;
+}
diff --git a/indra/test/llpipeutil.h b/indra/test/llpipeutil.h
index 25311780ac..a52f141d55 100644
--- a/indra/test/llpipeutil.h
+++ b/indra/test/llpipeutil.h
@@ -145,4 +145,24 @@ protected:
LLPumpIO* pump);
};
+/**
+ * @brief Pipe that sleeps, and then responds later.
+ */
+class LLIOSleeper : public LLIOPipe
+{
+public:
+ LLIOSleeper() : mRespond(false) {}
+
+protected:
+ virtual EStatus process_impl(
+ const LLChannelDescriptors& channels,
+ buffer_ptr_t& buffer,
+ bool& eos,
+ LLSD& context,
+ LLPumpIO* pump);
+private:
+ bool mRespond;
+
+};
+
#endif // LL_LLPIPEUTIL_H
diff --git a/indra/test/test.cpp b/indra/test/test.cpp
index fc8f8d9711..f573d53ba8 100644
--- a/indra/test/test.cpp
+++ b/indra/test/test.cpp
@@ -170,6 +170,7 @@ static const apr_getopt_option_t TEST_CL_OPTIONS[] =
{"group", 'g', 1, "Run test group specified by option argument."},
{"skip", 's', 1, "Skip test number specified by option argument. Only works when a specific group is being tested"},
{"wait", 'w', 0, "Wait for input before exit."},
+ {"debug", 'd', 0, "Emit full debug logs."},
{0, 0, 0, 0}
};
@@ -224,7 +225,8 @@ int main(int argc, char **argv)
LLError::initForApplication(".");
LLError::setFatalFunction(wouldHaveCrashed);
LLError::setDefaultLevel(LLError::LEVEL_ERROR);
- // *FIX: should come from error config file
+ //< *TODO: should come from error config file. Note that we
+ // have a command line option that sets this to debug.
#ifdef CTYPE_WORKAROUND
ctype_workaround();
@@ -286,6 +288,11 @@ int main(int argc, char **argv)
case 'w':
wait_at_exit = true;
break;
+ case 'd':
+ // *TODO: should come from error config file. We set it to
+ // ERROR by default, so this allows full debug levels.
+ LLError::setDefaultLevel(LLError::LEVEL_DEBUG);
+ break;
default:
stream_usage(std::cerr, argv[0]);
return 1;