diff options
| -rw-r--r-- | indra/llmessage/lliopipe.cpp | 1 | ||||
| -rw-r--r-- | indra/llmessage/lliopipe.h | 9 | ||||
| -rw-r--r-- | indra/llmessage/lliosocket.cpp | 38 | ||||
| -rw-r--r-- | indra/llmessage/llpumpio.cpp | 216 | ||||
| -rw-r--r-- | indra/llmessage/llpumpio.h | 14 | ||||
| -rw-r--r-- | indra/test/io.cpp | 64 | ||||
| -rw-r--r-- | indra/test/llpipeutil.cpp | 22 | ||||
| -rw-r--r-- | indra/test/llpipeutil.h | 20 | ||||
| -rw-r--r-- | indra/test/test.cpp | 9 | 
9 files changed, 341 insertions, 52 deletions
| diff --git a/indra/llmessage/lliopipe.cpp b/indra/llmessage/lliopipe.cpp index 8c0d01f7fc..a0cf5183f1 100644 --- a/indra/llmessage/lliopipe.cpp +++ b/indra/llmessage/lliopipe.cpp @@ -51,6 +51,7 @@ static const std::string STATUS_ERROR_NAMES[LLIOPipe::STATUS_ERROR_COUNT] =  	std::string("STATUS_NOT_IMPLEMENTED"),  	std::string("STATUS_PRECONDITION_NOT_MET"),  	std::string("STATUS_NO_CONNECTION"), +	std::string("STATUS_LOST_CONNECTION"),  	std::string("STATUS_EXPIRED"),  }; diff --git a/indra/llmessage/lliopipe.h b/indra/llmessage/lliopipe.h index 47924eecc6..e480f83b55 100644 --- a/indra/llmessage/lliopipe.h +++ b/indra/llmessage/lliopipe.h @@ -148,11 +148,14 @@ public:  		// This means we could not connect to a remote host.  		STATUS_NO_CONNECTION = -4, -		// This means we could not connect to a remote host. -		STATUS_EXPIRED = -5, +		// The connection was lost. +		STATUS_LOST_CONNECTION = -5, + +		// The totoal process time has exceeded the timeout. +		STATUS_EXPIRED = -6,  		// Keep track of the count of codes here. -		STATUS_ERROR_COUNT = 5, +		STATUS_ERROR_COUNT = 6,  	};  	/**  diff --git a/indra/llmessage/lliosocket.cpp b/indra/llmessage/lliosocket.cpp index 920a5e4aa2..b1f55a297c 100644 --- a/indra/llmessage/lliosocket.cpp +++ b/indra/llmessage/lliosocket.cpp @@ -64,6 +64,40 @@ bool is_addr_in_use(apr_status_t status)  #endif  } +#if LL_LINUX +// Define this to see the actual file descriptors being tossed around. +//#define LL_DEBUG_SOCKET_FILE_DESCRIPTORS 1 +#if LL_DEBUG_SOCKET_FILE_DESCRIPTORS +#include "apr-1/apr_portable.h" +#endif +#endif + + +// Quick function  +void ll_debug_socket(const char* msg, apr_socket_t* apr_sock) +{ +#if LL_DEBUG_SOCKET_FILE_DESCRIPTORS +	if(!apr_sock) +	{ +		lldebugs << "Socket -- " << (msg?msg:"") << ": no socket." << llendl; +		return; +	} +	// *TODO: Why doesn't this work? +	//apr_os_sock_t os_sock; +	int os_sock; +	if(APR_SUCCESS == apr_os_sock_get(&os_sock, apr_sock)) +	{ +		lldebugs << "Socket -- " << (msg?msg:"") << " on fd " << os_sock +			<< " at " << apr_sock << llendl; +	} +	else +	{ +		lldebugs << "Socket -- " << (msg?msg:"") << " no fd " +			<< " at " << apr_sock << llendl; +	} +#endif +} +  ///  /// LLSocket  /// @@ -199,6 +233,7 @@ bool LLSocket::blockingConnect(const LLHost& host)  		return false;  	}  	apr_socket_timeout_set(mSocket, 1000); +	ll_debug_socket("Blocking connect", mSocket);  	if(ll_apr_warn_status(apr_socket_connect(mSocket, sa))) return false;  	setOptions();  	return true; @@ -209,6 +244,7 @@ LLSocket::LLSocket(apr_socket_t* socket, apr_pool_t* pool) :  	mPool(pool),  	mPort(PORT_INVALID)  { +	ll_debug_socket("Constructing wholely formed socket", mSocket);  	LLMemType m1(LLMemType::MTYPE_IO_TCP);  } @@ -216,9 +252,9 @@ LLSocket::~LLSocket()  {  	LLMemType m1(LLMemType::MTYPE_IO_TCP);  	// *FIX: clean up memory we are holding. -	//lldebugs << "Destroying LLSocket" << llendl;  	if(mSocket)  	{ +		ll_debug_socket("Destroying socket", mSocket);  		apr_socket_close(mSocket);  	}  	if(mPool) diff --git a/indra/llmessage/llpumpio.cpp b/indra/llmessage/llpumpio.cpp index ea7c06f38d..c3b298d020 100644 --- a/indra/llmessage/llpumpio.cpp +++ b/indra/llmessage/llpumpio.cpp @@ -34,6 +34,7 @@  #include "linden_common.h"  #include "llpumpio.h" +#include <map>  #include <set>  #include "apr-1/apr_poll.h" @@ -41,10 +42,15 @@  #include "llmemtype.h"  #include "llstl.h" -// This should not be in production, but it is intensely useful during -// development. +// These should not be enabled in production, but they can be +// intensely useful during development for finding certain kinds of +// bugs.  #if LL_LINUX -#define LL_DEBUG_PIPE_TYPE_IN_PUMP 0 +//#define LL_DEBUG_PIPE_TYPE_IN_PUMP 1 +//#define LL_DEBUG_POLL_FILE_DESCRIPTORS 1 +#if LL_DEBUG_POLL_FILE_DESCRIPTORS +#include "apr-1/apr_portable.h" +#endif  #endif  #if LL_DEBUG_PIPE_TYPE_IN_PUMP @@ -73,6 +79,52 @@ extern const F32 NEVER_CHAIN_EXPIRY_SECS = 0.0f;  //#define LL_DEBUG_SPEW_BUFFER_CHANNEL_IN 1  //#define LL_DEBUG_SPEW_BUFFER_CHANNEL_OUT 1 +// +// local functions +// +void ll_debug_poll_fd(const char* msg, const apr_pollfd_t* poll) +{ +#if LL_DEBUG_POLL_FILE_DESCRIPTORS +	if(!poll) +	{ +		lldebugs << "Poll -- " << (msg?msg:"") << ": no pollfd." << llendl; +		return; +	} +	if(poll->desc.s) +	{ +		apr_os_sock_t os_sock; +		if(APR_SUCCESS == apr_os_sock_get(&os_sock, poll->desc.s)) +		{ +			lldebugs << "Poll -- " << (msg?msg:"") << " on fd " << os_sock +				 << " at " << poll->desc.s << llendl; +		} +		else +		{ +			lldebugs << "Poll -- " << (msg?msg:"") << " no fd " +				 << " at " << poll->desc.s << llendl; +		} +	} +	else if(poll->desc.f) +	{ +		apr_os_file_t os_file; +		if(APR_SUCCESS == apr_os_file_get(&os_file, poll->desc.f)) +		{ +			lldebugs << "Poll -- " << (msg?msg:"") << " on fd " << os_file +				 << " at " << poll->desc.f << llendl; +		} +		else +		{ +			lldebugs << "Poll -- " << (msg?msg:"") << " no fd " +				 << " at " << poll->desc.f << llendl; +		} +	} +	else +	{ +		lldebugs << "Poll -- " << (msg?msg:"") << ": no descriptor." << llendl; +	} +#endif	 +} +  /**   * @class   */ @@ -217,50 +269,88 @@ bool LLPumpIO::setTimeoutSeconds(F32 timeout)  	return true;  } +static std::string events_2_string(apr_int16_t events) +{ +	std::ostringstream ostr; +	if(events & APR_POLLIN) +	{ +		ostr << "read,"; +	} +	if(events & APR_POLLPRI) +	{ +		ostr << "priority,"; +	} +	if(events & APR_POLLOUT) +	{ +		ostr << "write,"; +	} +	if(events & APR_POLLERR) +	{ +		ostr << "error,"; +	} +	if(events & APR_POLLHUP) +	{ +		ostr << "hangup,"; +	} +	if(events & APR_POLLNVAL) +	{ +		ostr << "invalid,"; +	} +	return chop_tail_copy(ostr.str(), 1); +} +  bool LLPumpIO::setConditional(LLIOPipe* pipe, const apr_pollfd_t* poll)  {  	LLMemType m1(LLMemType::MTYPE_IO_PUMP); -	//lldebugs << "LLPumpIO::setConditional" << llendl; -	if(pipe) +	if(!pipe) return false; +	ll_debug_poll_fd("Set conditional", poll); + +	lldebugs << "Setting conditionals (" << events_2_string(poll->reqevents) +		 << ") " +#if LL_DEBUG_PIPE_TYPE_IN_PUMP +		 << "on pipe " << typeid(*pipe).name()  +#endif +		 << " at " << pipe << llendl; + +	// remove any matching poll file descriptors for this pipe. +	LLIOPipe::ptr_t pipe_ptr(pipe); +	LLChainInfo::conditionals_t::iterator it; +	it = (*mCurrentChain).mDescriptors.begin(); +	while(it != (*mCurrentChain).mDescriptors.end())  	{ -		// remove any matching poll file descriptors for this pipe. -		LLIOPipe::ptr_t pipe_ptr(pipe); -		LLChainInfo::conditionals_t::iterator it; -		it = (*mCurrentChain).mDescriptors.begin(); -		while(it != (*mCurrentChain).mDescriptors.end()) +		LLChainInfo::pipe_conditional_t& value = (*it); +		if(pipe_ptr == value.first)  		{ -			LLChainInfo::pipe_conditional_t& value = (*it); -			if(pipe_ptr == value.first) -			{ -				ll_delete_apr_pollset_fd_client_data()(value); -				it = (*mCurrentChain).mDescriptors.erase(it); -				mRebuildPollset = true; -			} -			else -			{ -				++it; -			} +			ll_delete_apr_pollset_fd_client_data()(value); +			it = (*mCurrentChain).mDescriptors.erase(it); +			mRebuildPollset = true;  		} - -		if(poll) +		else  		{ -			LLChainInfo::pipe_conditional_t value; -			value.first = pipe_ptr; -			value.second = *poll; -			if(!poll->p) -			{ -				// each fd needs a pool to work with, so if one was -				// not specified, use this pool. -				// *FIX: Should it always be this pool? -				value.second.p = mPool; -			} -			value.second.client_data = new S32(++mPollsetClientID); -			(*mCurrentChain).mDescriptors.push_back(value); -			mRebuildPollset = true; +			++it;  		} +	} + +	if(!poll) +	{ +		mRebuildPollset = true;  		return true;  	} -	return false; +	LLChainInfo::pipe_conditional_t value; +	value.first = pipe_ptr; +	value.second = *poll; +	value.second.rtnevents = 0; +	if(!poll->p) +	{ +		// each fd needs a pool to work with, so if one was +		// not specified, use this pool. +		// *FIX: Should it always be this pool? +		value.second.p = mPool; +	} +	value.second.client_data = new S32(++mPollsetClientID); +	(*mCurrentChain).mDescriptors.push_back(value); +	mRebuildPollset = true; +	return true;  }  S32 LLPumpIO::setLock() @@ -412,24 +502,25 @@ void LLPumpIO::pump(const S32& poll_timeout)  	}  	// Poll based on the last known pollset -	// *FIX: may want to pass in a poll timeout so it works correctly +	// *TODO: may want to pass in a poll timeout so it works correctly  	// in single and multi threaded processes.  	PUMP_DEBUG; -	typedef std::set<S32> signal_client_t; +	typedef std::map<S32, S32> signal_client_t;  	signal_client_t signalled_client; +	const apr_pollfd_t* poll_fd = NULL;  	if(mPollset)  	{  		PUMP_DEBUG;  		//llinfos << "polling" << llendl;  		S32 count = 0;  		S32 client_id = 0; -		const apr_pollfd_t* poll_fd = NULL;  		apr_pollset_poll(mPollset, poll_timeout, &count, &poll_fd);  		PUMP_DEBUG; -		for(S32 i = 0; i < count; ++i) +		for(S32 ii = 0; ii < count; ++ii)  		{ -			client_id = *((S32*)poll_fd[i].client_data); -			signalled_client.insert(client_id); +			ll_debug_poll_fd("Signalled pipe", &poll_fd[ii]); +			client_id = *((S32*)poll_fd[ii].client_data); +			signalled_client[client_id] = ii;  		}  		PUMP_DEBUG;  	} @@ -515,16 +606,49 @@ void LLPumpIO::pump(const S32& poll_timeout)  				LLChainInfo::conditionals_t::iterator end;  				end = (*run_chain).mDescriptors.end();  				S32 client_id = 0; +				signal_client_t::iterator signal;  				for(; it != end; ++it)  				{  					PUMP_DEBUG;  					client_id = *((S32*)((*it).second.client_data)); -					if(signalled_client.find(client_id) != not_signalled) +					signal = signalled_client.find(client_id); +					if (signal == not_signalled) continue; +					static const apr_int16_t POLL_CHAIN_ERROR = +						APR_POLLHUP | APR_POLLNVAL | APR_POLLERR; +					const apr_pollfd_t* poll = &(poll_fd[(*signal).second]); +					if(poll->rtnevents & POLL_CHAIN_ERROR)  					{ -						process_this_chain = true; +						// Potential eror condition has been +						// returned. If HUP was one of them, we pass +						// that as the error even though there may be +						// more. If there are in fact more errors, +						// we'll just wait for that detection until +						// the next pump() cycle to catch it so that +						// the logic here gets no more strained than +						// it already is. +						LLIOPipe::EStatus error_status; +						if(poll->rtnevents & APR_POLLHUP) +							error_status = LLIOPipe::STATUS_LOST_CONNECTION; +						else +							error_status = LLIOPipe::STATUS_ERROR; +						if(handleChainError(*run_chain, error_status)) break; +						ll_debug_poll_fd("Removing pipe", poll); +						llwarns << "Removing pipe " +							<< (*run_chain).mChainLinks[0].mPipe +							<< " '" +							<< typeid( +								*((*run_chain).mChainLinks[0].mPipe)).name() +							<< "' because: " +							<< events_2_string(poll->rtnevents) +							<< llendl; +						(*run_chain).mHead = (*run_chain).mChainLinks.end();  						break;  					} -					//llinfos << "no fd ready for this one." << llendl; + +					// at least 1 fd got signalled, and there were no +					// errors. That means we process this chain. +					process_this_chain = true; +					break;  				}  			}  		} diff --git a/indra/llmessage/llpumpio.h b/indra/llmessage/llpumpio.h index 4d865a82ec..1609650f1f 100644 --- a/indra/llmessage/llpumpio.h +++ b/indra/llmessage/llpumpio.h @@ -424,6 +424,20 @@ protected:  	 * @return Retuns true if someone handled the error  	 */  	bool handleChainError(LLChainInfo& chain, LLIOPipe::EStatus error); + +public: +	/**  +	 * @brief Return number of running chains. +	 * +	 * *NOTE: This is only used in debugging and not considered +	 * efficient or safe enough for production use. +	 */ +	running_chains_t::size_type runningChains() const +	{ +		return mRunningChains.size(); +	} + +  }; diff --git a/indra/test/io.cpp b/indra/test/io.cpp index 363f375014..350fc5394b 100644 --- a/indra/test/io.cpp +++ b/indra/test/io.cpp @@ -1080,7 +1080,7 @@ namespace tut  			mPool,  			mSocket,  			factory); -		server->setResponseTimeout(SHORT_CHAIN_EXPIRY_SECS + 2.0f); +		server->setResponseTimeout(SHORT_CHAIN_EXPIRY_SECS + 1.80f);  		chain.push_back(LLIOPipe::ptr_t(server));  		mPump->addChain(chain, NEVER_CHAIN_EXPIRY_SECS); @@ -1108,6 +1108,68 @@ namespace tut  		F32 elapsed = pump_loop(mPump, SHORT_CHAIN_EXPIRY_SECS + 3.0f);  		ensure("Did not take too long", (elapsed < DEFAULT_CHAIN_EXPIRY_SECS));  	} + +	template<> template<> +	void fitness_test_object::test<5>() +	{ +		// Set up the server +		LLPumpIO::chain_t chain; +		typedef LLCloneIOFactory<LLIOSleeper> sleeper_t; +		sleeper_t* sleeper = new sleeper_t(new LLIOSleeper); +		boost::shared_ptr<LLChainIOFactory> factory(sleeper); +		LLIOServerSocket* server = new LLIOServerSocket( +			mPool, +			mSocket, +			factory); +		server->setResponseTimeout(1.0); +		chain.push_back(LLIOPipe::ptr_t(server)); +		mPump->addChain(chain, NEVER_CHAIN_EXPIRY_SECS); +		// We need to tickle the pump a little to set up the listen() +		pump_loop(mPump, 0.1f); +		U32 count = mPump->runningChains(); +		ensure_equals("server chain onboard", count, 1); +		lldebugs << "** Server is up." << llendl; + +		// Set up the client +		LLSocket::ptr_t client = LLSocket::create(mPool, LLSocket::STREAM_TCP); +		LLHost server_host("127.0.0.1", SERVER_LISTEN_PORT); +		bool connected = client->blockingConnect(server_host); +		ensure("Connected to server", connected); +		lldebugs << "connected" << llendl; +		F32 elapsed = pump_loop(mPump,0.1f); +		count = mPump->runningChains(); +		ensure_equals("server chain onboard", count, 2); +		lldebugs << "** Client is connected." << llendl; + +		// We have connected, since the socket reader does not block, +		// the first call to read data will return EAGAIN, so we need +		// to write something. +		chain.clear(); +		chain.push_back(LLIOPipe::ptr_t(new LLPipeStringInjector("hi"))); +		chain.push_back(LLIOPipe::ptr_t(new LLIOSocketWriter(client))); +		chain.push_back(LLIOPipe::ptr_t(new LLIONull)); +		mPump->addChain(chain, 0.2); +		chain.clear(); + +		// pump for a bit and make sure all 3 chains are running +		elapsed = pump_loop(mPump,0.1f); +		count = mPump->runningChains(); +		ensure_equals("client chain onboard", count, 3); +		lldebugs << "** request should have been sent." << llendl; + +		// pump for long enough the the client socket closes, and the +		// server socket should not be closed yet. +		elapsed = pump_loop(mPump,0.2f); +		count = mPump->runningChains(); +		ensure_equals("client chain timed out ", count, 2); +		lldebugs << "** client chain should be closed." << llendl; + +		// At this point, the socket should be closed by the timeout +		elapsed = pump_loop(mPump,1.0f); +		count = mPump->runningChains(); +		ensure_equals("accepted socked close", count, 1); +		lldebugs << "** Sleeper should have timed out.." << llendl; +	}  }  namespace tut diff --git a/indra/test/llpipeutil.cpp b/indra/test/llpipeutil.cpp index b7b9122615..c9c1eeb8b4 100644 --- a/indra/test/llpipeutil.cpp +++ b/indra/test/llpipeutil.cpp @@ -164,3 +164,25 @@ LLIOPipe::EStatus LLIONull::process_impl(  {  	return STATUS_OK;  } + +// virtual +LLIOPipe::EStatus LLIOSleeper::process_impl( +	const LLChannelDescriptors& channels, +	buffer_ptr_t& buffer, +	bool& eos, +	LLSD& context, +	LLPumpIO* pump) +{ +	if(!mRespond) +	{ +		lldebugs << "LLIOSleeper::process_impl() sleeping." << llendl; +		mRespond = true; +		static const F64 SLEEP_TIME = 2.0; +		pump->sleepChain(SLEEP_TIME); +		return STATUS_BREAK; +	} +	lldebugs << "LLIOSleeper::process_impl() responding." << llendl; +	LLBufferStream ostr(channels, buffer.get()); +	ostr << "huh? sorry, I was sleeping." << std::endl; +	return STATUS_DONE; +} diff --git a/indra/test/llpipeutil.h b/indra/test/llpipeutil.h index 25311780ac..a52f141d55 100644 --- a/indra/test/llpipeutil.h +++ b/indra/test/llpipeutil.h @@ -145,4 +145,24 @@ protected:  		LLPumpIO* pump);  }; +/** + * @brief Pipe that sleeps, and then responds later. + */ +class LLIOSleeper : public LLIOPipe +{ +public: +	LLIOSleeper() : mRespond(false) {} +	   +protected: +    virtual EStatus process_impl( +		const LLChannelDescriptors& channels, +		buffer_ptr_t& buffer, +		bool& eos, +		LLSD& context, +		LLPumpIO* pump); +private: +	bool mRespond; + +}; +  #endif // LL_LLPIPEUTIL_H diff --git a/indra/test/test.cpp b/indra/test/test.cpp index fc8f8d9711..f573d53ba8 100644 --- a/indra/test/test.cpp +++ b/indra/test/test.cpp @@ -170,6 +170,7 @@ static const apr_getopt_option_t TEST_CL_OPTIONS[] =  	{"group", 'g', 1, "Run test group specified by option argument."},  	{"skip", 's', 1, "Skip test number specified by option argument. Only works when a specific group is being tested"},  	{"wait", 'w', 0, "Wait for input before exit."}, +	{"debug", 'd', 0, "Emit full debug logs."},  	{0, 0, 0, 0}  }; @@ -224,7 +225,8 @@ int main(int argc, char **argv)  	LLError::initForApplication(".");  	LLError::setFatalFunction(wouldHaveCrashed);  	LLError::setDefaultLevel(LLError::LEVEL_ERROR); -		// *FIX: should come from error config file +		//< *TODO: should come from error config file. Note that we +		// have a command line option that sets this to debug.  #ifdef CTYPE_WORKAROUND  	ctype_workaround(); @@ -286,6 +288,11 @@ int main(int argc, char **argv)  		case 'w':  			wait_at_exit = true;  			break; +		case 'd': +			// *TODO: should come from error config file. We set it to +			// ERROR by default, so this allows full debug levels. +			LLError::setDefaultLevel(LLError::LEVEL_DEBUG); +			break;  		default:  			stream_usage(std::cerr, argv[0]);  			return 1; | 
