diff options
| author | Nat Goodspeed <nat@lindenlab.com> | 2012-02-16 16:05:04 -0500 | 
|---|---|---|
| committer | Nat Goodspeed <nat@lindenlab.com> | 2012-02-16 16:05:04 -0500 | 
| commit | e92c3113545dd60fb76e115da201163e340c730c (patch) | |
| tree | 4c9a5e231f0e2ad0075a25a7154e7e181434675d | |
| parent | 85057908c3f7e48f1dc086ea1c82e672674b2596 (diff) | |
Add LLProcess::ReadPipe::find() methods, with corresponding npos.
If it's useful to have contains() to tell you whether incoming data contains a
particular substring, and if it's useful for contains() and peek() to accept
an offset within that data, then it's useful to allow you to get the offset of
a desired substring within that data. But of course a find() returning offset
needs something like std::string::npos for "not found"; borrow that
convention.
Support both find(const std::string&) and find(char); the latter permits a
more efficient implementation. In fact, make find(string) recognize a string
of length 1 and leverage the find(char) implementation.
Given that, reimplement contains(mumble) as shorthand for find(mumble) != npos.
Implement find() overloads using std::search() and std::find() on
boost::asio::streambuf character iterators, rather than copying to std::string
and then using string search like previous contains() implementation.
Reimplement WritePipeImpl::tick() and ReadPipeImpl::tick() to write/read
directly from/to boost::asio::streambuf data, instead of copying to/from a
temporary flat buffer.
As long as ReadPipeImpl::tick() keeps successfully filling buffers, keep
reading. Previous implementation would only handle a long child write over
successive tick() calls. Stop on read error or when we come up short.
| -rw-r--r-- | indra/llcommon/llprocess.cpp | 259 | ||||
| -rw-r--r-- | indra/llcommon/llprocess.h | 37 | ||||
| -rw-r--r-- | indra/llcommon/tests/llprocess_test.cpp | 2 | 
3 files changed, 210 insertions, 88 deletions
| diff --git a/indra/llcommon/llprocess.cpp b/indra/llcommon/llprocess.cpp index 1481bf571f..aa22b3f805 100644 --- a/indra/llcommon/llprocess.cpp +++ b/indra/llcommon/llprocess.cpp @@ -120,9 +120,12 @@ private:  static LLProcessListener sProcessListener;  LLProcess::BasePipe::~BasePipe() {} +const LLProcess::BasePipe::size_type +	  LLProcess::BasePipe::npos((std::numeric_limits<LLProcess::BasePipe::size_type>::max)());  class WritePipeImpl: public LLProcess::WritePipe  { +    LOG_CLASS(WritePipeImpl);  public:  	WritePipeImpl(const std::string& desc, apr_file_t* pipe):  		mDesc(desc), @@ -139,30 +142,53 @@ public:  	bool tick(const LLSD&)  	{ +		typedef boost::asio::streambuf::const_buffers_type const_buffer_sequence;  		// If there's anything to send, try to send it. -		if (mStreambuf.size()) +		std::size_t total(mStreambuf.size()), consumed(0); +		if (total)  		{ -			// Copy data out from mStreambuf to a flat, contiguous buffer to -			// write -- but only up to a certain size. -			std::size_t total(mStreambuf.size()); -			std::size_t bufsize((std::min)(std::size_t(4096), total)); -			boost::asio::streambuf::const_buffers_type bufs = mStreambuf.data(); -			std::vector<char> buffer( -				boost::asio::buffers_begin(bufs), -				boost::asio::buffers_begin(bufs) + bufsize); -			apr_size_t written(bufsize); -			ll_apr_warn_status(apr_file_write(mPipe, &buffer[0], &written)); -			// 'written' is modified to reflect the number of bytes actually -			// written. Since they've been sent, remove them from the +			const_buffer_sequence bufs = mStreambuf.data(); +			// In general, our streambuf might contain a number of different +			// physical buffers; iterate over those. +			for (const_buffer_sequence::const_iterator bufi(bufs.begin()), bufend(bufs.end()); +				 bufi != bufend; ++bufi) +			{ +				// http://www.boost.org/doc/libs/1_49_0_beta1/doc/html/boost_asio/reference/buffer.html#boost_asio.reference.buffer.accessing_buffer_contents +				std::size_t towrite(boost::asio::buffer_size(*bufi)); +				apr_size_t written(towrite); +				apr_status_t err = apr_file_write(mPipe, +												  boost::asio::buffer_cast<const void*>(*bufi), +												  &written); +				// EAGAIN is exactly what we want from a nonblocking pipe. +				// Rather than waiting for data, it should return immediately. +				if (! (err == APR_SUCCESS || APR_STATUS_IS_EAGAIN(err))) +				{ +					LL_WARNS("LLProcess") << "apr_file_write(" << towrite << ") on " << mDesc +										  << " got " << err << ":" << LL_ENDL; +					ll_apr_warn_status(err); +				} + +				// 'written' is modified to reflect the number of bytes actually +				// written. Make sure we consume those later. (Don't consume them +				// now, that would invalidate the buffer iterator sequence!) +				consumed += written; +				LL_DEBUGS("LLProcess") << "wrote " << written << " of " << towrite +									   << " bytes to " << mDesc +									   << " (original " << total << ")" << LL_ENDL; + +				// The parent end of this pipe is nonblocking. If we weren't able +				// to write everything we wanted, don't keep banging on it -- that +				// won't change until the child reads some. Wait for next tick(). +				if (written < towrite) +					break; +			} +			// In all, we managed to write 'consumed' bytes. Remove them from the  			// streambuf so we don't keep trying to send them. This could be -			// anywhere from 0 up to mStreambuf.size(); anything we haven't -			// yet sent, we'll try again next tick() call. -			mStreambuf.consume(written); -			LL_DEBUGS("LLProcess") << "wrote " << written << " of " << bufsize -								   << " bytes to " << mDesc -								   << " (original " << total << "), " -								   << mStreambuf.size() << " remaining" << LL_ENDL; +			// anywhere from 0 up to mStreambuf.size(); anything we haven't yet +			// sent, we'll try again later. +			mStreambuf.consume(consumed);  		} +  		return false;  	} @@ -176,6 +202,7 @@ private:  class ReadPipeImpl: public LLProcess::ReadPipe  { +    LOG_CLASS(ReadPipeImpl);  public:  	ReadPipeImpl(const std::string& desc, apr_file_t* pipe):  		mDesc(desc), @@ -184,7 +211,7 @@ public:  		mStream(&mStreambuf),  		mPump("ReadPipe"),  		// use funky syntax to call max() to avoid blighted max() macros -		mLimit((std::numeric_limits<size_t>::max)()) +		mLimit(npos)  	{  		mConnection = LLEventPumps::instance().obtain("mainloop")  			.listen(LLEventPump::inventName("ReadPipe"), @@ -195,79 +222,149 @@ public:  	// methods with implementation data concealed from the base class.  	virtual std::istream& get_istream() { return mStream; }  	virtual LLEventPump& getPump() { return mPump; } -	virtual void setLimit(size_t limit) { mLimit = limit; } -	virtual size_t getLimit() const { return mLimit; } -    virtual std::size_t size() { return mStreambuf.size(); } +	virtual void setLimit(size_type limit) { mLimit = limit; } +	virtual size_type getLimit() const { return mLimit; } +	virtual size_type size() const { return mStreambuf.size(); } -	virtual std::string peek(std::size_t offset=0, -							 std::size_t len=(std::numeric_limits<std::size_t>::max)()) +	virtual std::string peek(size_type offset=0, size_type len=npos) const  	{  		// Constrain caller's offset and len to overlap actual buffer content. -		std::size_t real_offset = (std::min)(mStreambuf.size(), offset); -		std::size_t real_end	= (std::min)(mStreambuf.size(), real_offset + len); +		std::size_t real_offset = (std::min)(mStreambuf.size(), std::size_t(offset)); +		std::size_t real_end	= (std::min)(mStreambuf.size(), std::size_t(real_offset + len));  		boost::asio::streambuf::const_buffers_type cbufs = mStreambuf.data();  		return std::string(boost::asio::buffers_begin(cbufs) + real_offset,  						   boost::asio::buffers_begin(cbufs) + real_end);  	} -	virtual bool contains(const std::string& seek, std::size_t offset=0) +	virtual size_type find(const std::string& seek, size_type offset=0) const  	{ -		// There may be a more efficient way to search mStreambuf contents, -		// but this is far the easiest... -		return peek(offset).find(seek) != std::string::npos; +		// If we're passing a string of length 1, use find(char), which can +		// use an O(n) std::find() rather than the O(n^2) std::search(). +		if (seek.length() == 1) +		{ +			return find(seek[0], offset); +		} + +		// If offset is beyond the whole buffer, can't even construct a valid +		// iterator range; can't possibly find the string we seek. +		if (offset > mStreambuf.size()) +		{ +			return npos; +		} + +		boost::asio::streambuf::const_buffers_type cbufs = mStreambuf.data(); +		boost::asio::buffers_iterator<boost::asio::streambuf::const_buffers_type> +			begin(boost::asio::buffers_begin(cbufs)), +			end	 (boost::asio::buffers_end(cbufs)), +			found(std::search(begin + offset, end, seek.begin(), seek.end())); +		return (found == end)? npos : (found - begin);  	} -private: -	bool tick(const LLSD&) +	virtual size_type find(char seek, size_type offset=0) const  	{ -		// Allocate a buffer and try, every time, to read into it. -		std::vector<char> buffer(4096); -		apr_size_t gotten(buffer.size()); -		apr_status_t err = apr_file_read(mPipe, &buffer[0], &gotten); -		if (err == APR_EOF) +		// If offset is beyond the whole buffer, can't even construct a valid +		// iterator range; can't possibly find the char we seek. +		if (offset > mStreambuf.size())  		{ -			// Handle EOF specially: it's part of normal-case processing. -			LL_DEBUGS("LLProcess") << "EOF on " << mDesc << LL_ENDL; -			// We won't need any more tick() calls. -			mConnection.disconnect(); +			return npos;  		} -		else if (! ll_apr_warn_status(err)) // validate anything but EOF + +		boost::asio::streambuf::const_buffers_type cbufs = mStreambuf.data(); +		boost::asio::buffers_iterator<boost::asio::streambuf::const_buffers_type> +			begin(boost::asio::buffers_begin(cbufs)), +			end	 (boost::asio::buffers_end(cbufs)), +			found(std::find(begin + offset, end, seek)); +		return (found == end)? npos : (found - begin); +	} + +private: +	bool tick(const LLSD&) +	{ +		typedef boost::asio::streambuf::mutable_buffers_type mutable_buffer_sequence; +		// Try, every time, to read into our streambuf. In fact, we have no +		// idea how much data the child might be trying to send: keep trying +		// until we're convinced we've temporarily exhausted the pipe. +		bool exhausted = false; +		std::size_t committed(0); +		do  		{ -			// 'gotten' was modified to reflect the number of bytes actually -			// received. If nonzero, add them to the streambuf and notify -			// interested parties. -			if (gotten) +			// attempt to read an arbitrary size +			mutable_buffer_sequence bufs = mStreambuf.prepare(4096); +			// In general, the mutable_buffer_sequence returned by prepare() might +			// contain a number of different physical buffers; iterate over those. +			std::size_t tocommit(0); +			for (mutable_buffer_sequence::const_iterator bufi(bufs.begin()), bufend(bufs.end()); +				 bufi != bufend; ++bufi)  			{ -				boost::asio::streambuf::mutable_buffers_type mbufs = mStreambuf.prepare(gotten); -				std::copy(buffer.begin(), buffer.begin() + gotten, -						  boost::asio::buffers_begin(mbufs)); -				// Don't forget to "commit" the data! The sequence (prepare(), -				// commit()) is obviously intended to allow us to allocate -				// buffer space, then read directly into some portion of it, -				// then commit only as much as we managed to obtain. But the -				// only official (documented) way I can find to populate a -				// mutable_buffers_type is to use buffers_begin(). It Would Be -				// Nice if we were permitted to directly read into -				// mutable_buffers_type (not to mention writing directly from -				// const_buffers_type in WritePipeImpl; APR even supports an -				// apr_file_writev() function for writing from discontiguous -				// buffers) -- but as of 2012-02-14, this copying appears to -				// be the safest tactic. -				mStreambuf.commit(gotten); -				LL_DEBUGS("LLProcess") << "read " << gotten << " of " << buffer.size() -									   << " bytes from " << mDesc << ", new total " -									   << mStreambuf.size() << LL_ENDL; - -				// Now that we've received new data, publish it on our -				// LLEventPump as advertised. Constrain it by mLimit. But show -				// listener the actual accumulated buffer size, regardless of -				// mLimit. -				std::size_t datasize((std::min)(mLimit, mStreambuf.size())); -				mPump.post(LLSDMap -						   ("data", peek(0, datasize)) -						   ("len", LLSD::Integer(mStreambuf.size()))); +				// http://www.boost.org/doc/libs/1_49_0_beta1/doc/html/boost_asio/reference/buffer.html#boost_asio.reference.buffer.accessing_buffer_contents +				std::size_t toread(boost::asio::buffer_size(*bufi)); +				apr_size_t gotten(toread); +				apr_status_t err = apr_file_read(mPipe, +												 boost::asio::buffer_cast<void*>(*bufi), +												 &gotten); +				// EAGAIN is exactly what we want from a nonblocking pipe. +				// Rather than waiting for data, it should return immediately. +				if (! (err == APR_SUCCESS || APR_STATUS_IS_EAGAIN(err))) +				{ +					// Handle EOF specially: it's part of normal-case processing. +					if (err == APR_EOF) +					{ +						LL_DEBUGS("LLProcess") << "EOF on " << mDesc << LL_ENDL; +					} +					else +					{ +						LL_WARNS("LLProcess") << "apr_file_read(" << toread << ") on " << mDesc +											  << " got " << err << ":" << LL_ENDL; +						ll_apr_warn_status(err); +					} +					// Either way, though, we won't need any more tick() calls. +					mConnection.disconnect(); +					exhausted = true; // also break outer retry loop +					break; +				} + +				// 'gotten' was modified to reflect the number of bytes actually +				// received. Make sure we commit those later. (Don't commit them +				// now, that would invalidate the buffer iterator sequence!) +				tocommit += gotten; +				LL_DEBUGS("LLProcess") << "read " << gotten << " of " << toread +									   << " bytes from " << mDesc << LL_ENDL; + +				// The parent end of this pipe is nonblocking. If we weren't even +				// able to fill this buffer, don't loop to try to fill the next -- +				// that won't change until the child writes more. Wait for next +				// tick(). +				if (gotten < toread) +				{ +					// break outer retry loop too +					exhausted = true; +					break; +				}  			} + +			// Don't forget to "commit" the data! +			mStreambuf.commit(tocommit); +			committed += tocommit; + +			// 'exhausted' is set when we can't fill any one buffer of the +			// mutable_buffer_sequence established by the current prepare() +			// call -- whether due to error or not enough bytes. That is, +			// 'exhausted' is still false when we've filled every physical +			// buffer in the mutable_buffer_sequence. In that case, for all we +			// know, the child might have still more data pending -- go for it! +		} while (! exhausted); + +		if (committed) +		{ +			// If we actually received new data, publish it on our LLEventPump +			// as advertised. Constrain it by mLimit. But show listener the +			// actual accumulated buffer size, regardless of mLimit. +			size_type datasize((std::min)(mLimit, size_type(mStreambuf.size()))); +			mPump.post(LLSDMap +					   ("data", peek(0, datasize)) +					   ("len", LLSD::Integer(mStreambuf.size())));  		} +  		return false;  	} @@ -277,7 +374,7 @@ private:  	boost::asio::streambuf mStreambuf;  	std::istream mStream;  	LLEventStream mPump; -	size_t mLimit; +	size_type mLimit;  };  /// Need an exception to avoid constructing an invalid LLProcess object, but @@ -472,16 +569,18 @@ LLProcess::LLProcess(const LLSDOrParams& params):  	{  		if (select[i] != APR_CHILD_BLOCK)  			continue; +		std::string desc(STRINGIZE(mDesc << ' ' << whichfile[i])); +		apr_file_t* pipe(mProcess.*(members[i]));  		if (i == STDIN)  		{ -			mPipes.replace(i, new WritePipeImpl(whichfile[i], mProcess.*(members[i]))); +			mPipes.replace(i, new WritePipeImpl(desc, pipe));  		}  		else  		{ -			mPipes.replace(i, new ReadPipeImpl(whichfile[i], mProcess.*(members[i]))); +			mPipes.replace(i, new ReadPipeImpl(desc, pipe));  		}  		LL_DEBUGS("LLProcess") << "Instantiating " << typeid(mPipes[i]).name() -							   << "('" << whichfile[i] << "')" << LL_ENDL; +							   << "('" << desc << "')" << LL_ENDL;  	}  } diff --git a/indra/llcommon/llprocess.h b/indra/llcommon/llprocess.h index bf0517600d..2c6951b562 100644 --- a/indra/llcommon/llprocess.h +++ b/indra/llcommon/llprocess.h @@ -295,6 +295,9 @@ public:  	{  	public:  		virtual ~BasePipe() = 0; + +		typedef std::size_t size_type; +		static const size_type npos;  	};  	/// As returned by getWritePipe() or getOptWritePipe() @@ -338,7 +341,7 @@ public:  		 * the child, but the child happens to flush "12" before emitting  		 * "3\n", get_istream() >> myint could return 12 rather than 123!  		 */ -		virtual std::size_t size() = 0; +		virtual size_type size() const = 0;  		/**  		 * Peek at accumulated buffer data without consuming it. Optional @@ -346,14 +349,32 @@ public:  		 *  		 * @note You can discard buffer data using get_istream().ignore(n).  		 */ -		virtual std::string peek(std::size_t offset=0, -								 std::size_t len=(std::numeric_limits<std::size_t>::max)()) = 0; +		virtual std::string peek(size_type offset=0, size_type len=npos) const = 0; + +		/** +		 * Detect presence of a substring (or char) in accumulated buffer data +		 * without retrieving it. Optional offset allows you to search from +		 * specified position. +		 */ +		template <typename SEEK> +		bool contains(SEEK seek, size_type offset=0) const +		{ return find(seek, offset) != npos; } + +		/** +		 * Search for a substring in accumulated buffer data without +		 * retrieving it. Returns size_type position at which found, or npos +		 * meaning not found. Optional offset allows you to search from +		 * specified position. +		 */ +		virtual size_type find(const std::string& seek, size_type offset=0) const = 0;  		/** -		 * Search accumulated buffer data without retrieving it. Optional -		 * offset allows you to start at specified position. +		 * Search for a char in accumulated buffer data without retrieving it. +		 * Returns size_type position at which found, or npos meaning not +		 * found. Optional offset allows you to search from specified +		 * position.  		 */ -		virtual bool contains(const std::string& seek, std::size_t offset=0) = 0; +		virtual size_type find(char seek, size_type offset=0) const = 0;  		/**  		 * Get LLEventPump& on which to listen for incoming data. The posted @@ -377,12 +398,12 @@ public:  		 * the data posted with the LLSD event. If you don't call this method,  		 * all pending data will be posted.  		 */ -		virtual void setLimit(size_t limit) = 0; +		virtual void setLimit(size_type limit) = 0;  		/**  		 * Query the current setLimit() limit.  		 */ -		virtual size_t getLimit() const = 0; +		virtual size_type getLimit() const = 0;  	};  	/// Exception thrown by getWritePipe(), getReadPipe() if you didn't ask to diff --git a/indra/llcommon/tests/llprocess_test.cpp b/indra/llcommon/tests/llprocess_test.cpp index 31bc833a1d..d7bda34923 100644 --- a/indra/llcommon/tests/llprocess_test.cpp +++ b/indra/llcommon/tests/llprocess_test.cpp @@ -1131,5 +1131,7 @@ namespace tut      // test setLimit(), getLimit()      // test EOF -- check logging      // test peek() with substr +    // test contains(char) +    // test find(string, offset), find(char, offset), offset <, =, > size()  } // namespace tut | 
