diff options
author | Nat Goodspeed <nat@lindenlab.com> | 2012-02-16 16:05:04 -0500 |
---|---|---|
committer | Nat Goodspeed <nat@lindenlab.com> | 2012-02-16 16:05:04 -0500 |
commit | e92c3113545dd60fb76e115da201163e340c730c (patch) | |
tree | 4c9a5e231f0e2ad0075a25a7154e7e181434675d /indra | |
parent | 85057908c3f7e48f1dc086ea1c82e672674b2596 (diff) |
Add LLProcess::ReadPipe::find() methods, with corresponding npos.
If it's useful to have contains() to tell you whether incoming data contains a
particular substring, and if it's useful for contains() and peek() to accept
an offset within that data, then it's useful to allow you to get the offset of
a desired substring within that data. But of course a find() returning offset
needs something like std::string::npos for "not found"; borrow that
convention.
Support both find(const std::string&) and find(char); the latter permits a
more efficient implementation. In fact, make find(string) recognize a string
of length 1 and leverage the find(char) implementation.
Given that, reimplement contains(mumble) as shorthand for find(mumble) != npos.
Implement find() overloads using std::search() and std::find() on
boost::asio::streambuf character iterators, rather than copying to std::string
and then using string search like previous contains() implementation.
Reimplement WritePipeImpl::tick() and ReadPipeImpl::tick() to write/read
directly from/to boost::asio::streambuf data, instead of copying to/from a
temporary flat buffer.
As long as ReadPipeImpl::tick() keeps successfully filling buffers, keep
reading. Previous implementation would only handle a long child write over
successive tick() calls. Stop on read error or when we come up short.
Diffstat (limited to 'indra')
-rw-r--r-- | indra/llcommon/llprocess.cpp | 259 | ||||
-rw-r--r-- | indra/llcommon/llprocess.h | 37 | ||||
-rw-r--r-- | indra/llcommon/tests/llprocess_test.cpp | 2 |
3 files changed, 210 insertions, 88 deletions
diff --git a/indra/llcommon/llprocess.cpp b/indra/llcommon/llprocess.cpp index 1481bf571f..aa22b3f805 100644 --- a/indra/llcommon/llprocess.cpp +++ b/indra/llcommon/llprocess.cpp @@ -120,9 +120,12 @@ private: static LLProcessListener sProcessListener; LLProcess::BasePipe::~BasePipe() {} +const LLProcess::BasePipe::size_type + LLProcess::BasePipe::npos((std::numeric_limits<LLProcess::BasePipe::size_type>::max)()); class WritePipeImpl: public LLProcess::WritePipe { + LOG_CLASS(WritePipeImpl); public: WritePipeImpl(const std::string& desc, apr_file_t* pipe): mDesc(desc), @@ -139,30 +142,53 @@ public: bool tick(const LLSD&) { + typedef boost::asio::streambuf::const_buffers_type const_buffer_sequence; // If there's anything to send, try to send it. - if (mStreambuf.size()) + std::size_t total(mStreambuf.size()), consumed(0); + if (total) { - // Copy data out from mStreambuf to a flat, contiguous buffer to - // write -- but only up to a certain size. - std::size_t total(mStreambuf.size()); - std::size_t bufsize((std::min)(std::size_t(4096), total)); - boost::asio::streambuf::const_buffers_type bufs = mStreambuf.data(); - std::vector<char> buffer( - boost::asio::buffers_begin(bufs), - boost::asio::buffers_begin(bufs) + bufsize); - apr_size_t written(bufsize); - ll_apr_warn_status(apr_file_write(mPipe, &buffer[0], &written)); - // 'written' is modified to reflect the number of bytes actually - // written. Since they've been sent, remove them from the + const_buffer_sequence bufs = mStreambuf.data(); + // In general, our streambuf might contain a number of different + // physical buffers; iterate over those. + for (const_buffer_sequence::const_iterator bufi(bufs.begin()), bufend(bufs.end()); + bufi != bufend; ++bufi) + { + // http://www.boost.org/doc/libs/1_49_0_beta1/doc/html/boost_asio/reference/buffer.html#boost_asio.reference.buffer.accessing_buffer_contents + std::size_t towrite(boost::asio::buffer_size(*bufi)); + apr_size_t written(towrite); + apr_status_t err = apr_file_write(mPipe, + boost::asio::buffer_cast<const void*>(*bufi), + &written); + // EAGAIN is exactly what we want from a nonblocking pipe. + // Rather than waiting for data, it should return immediately. + if (! (err == APR_SUCCESS || APR_STATUS_IS_EAGAIN(err))) + { + LL_WARNS("LLProcess") << "apr_file_write(" << towrite << ") on " << mDesc + << " got " << err << ":" << LL_ENDL; + ll_apr_warn_status(err); + } + + // 'written' is modified to reflect the number of bytes actually + // written. Make sure we consume those later. (Don't consume them + // now, that would invalidate the buffer iterator sequence!) + consumed += written; + LL_DEBUGS("LLProcess") << "wrote " << written << " of " << towrite + << " bytes to " << mDesc + << " (original " << total << ")" << LL_ENDL; + + // The parent end of this pipe is nonblocking. If we weren't able + // to write everything we wanted, don't keep banging on it -- that + // won't change until the child reads some. Wait for next tick(). + if (written < towrite) + break; + } + // In all, we managed to write 'consumed' bytes. Remove them from the // streambuf so we don't keep trying to send them. This could be - // anywhere from 0 up to mStreambuf.size(); anything we haven't - // yet sent, we'll try again next tick() call. - mStreambuf.consume(written); - LL_DEBUGS("LLProcess") << "wrote " << written << " of " << bufsize - << " bytes to " << mDesc - << " (original " << total << "), " - << mStreambuf.size() << " remaining" << LL_ENDL; + // anywhere from 0 up to mStreambuf.size(); anything we haven't yet + // sent, we'll try again later. + mStreambuf.consume(consumed); } + return false; } @@ -176,6 +202,7 @@ private: class ReadPipeImpl: public LLProcess::ReadPipe { + LOG_CLASS(ReadPipeImpl); public: ReadPipeImpl(const std::string& desc, apr_file_t* pipe): mDesc(desc), @@ -184,7 +211,7 @@ public: mStream(&mStreambuf), mPump("ReadPipe"), // use funky syntax to call max() to avoid blighted max() macros - mLimit((std::numeric_limits<size_t>::max)()) + mLimit(npos) { mConnection = LLEventPumps::instance().obtain("mainloop") .listen(LLEventPump::inventName("ReadPipe"), @@ -195,79 +222,149 @@ public: // methods with implementation data concealed from the base class. virtual std::istream& get_istream() { return mStream; } virtual LLEventPump& getPump() { return mPump; } - virtual void setLimit(size_t limit) { mLimit = limit; } - virtual size_t getLimit() const { return mLimit; } - virtual std::size_t size() { return mStreambuf.size(); } + virtual void setLimit(size_type limit) { mLimit = limit; } + virtual size_type getLimit() const { return mLimit; } + virtual size_type size() const { return mStreambuf.size(); } - virtual std::string peek(std::size_t offset=0, - std::size_t len=(std::numeric_limits<std::size_t>::max)()) + virtual std::string peek(size_type offset=0, size_type len=npos) const { // Constrain caller's offset and len to overlap actual buffer content. - std::size_t real_offset = (std::min)(mStreambuf.size(), offset); - std::size_t real_end = (std::min)(mStreambuf.size(), real_offset + len); + std::size_t real_offset = (std::min)(mStreambuf.size(), std::size_t(offset)); + std::size_t real_end = (std::min)(mStreambuf.size(), std::size_t(real_offset + len)); boost::asio::streambuf::const_buffers_type cbufs = mStreambuf.data(); return std::string(boost::asio::buffers_begin(cbufs) + real_offset, boost::asio::buffers_begin(cbufs) + real_end); } - virtual bool contains(const std::string& seek, std::size_t offset=0) + virtual size_type find(const std::string& seek, size_type offset=0) const { - // There may be a more efficient way to search mStreambuf contents, - // but this is far the easiest... - return peek(offset).find(seek) != std::string::npos; + // If we're passing a string of length 1, use find(char), which can + // use an O(n) std::find() rather than the O(n^2) std::search(). + if (seek.length() == 1) + { + return find(seek[0], offset); + } + + // If offset is beyond the whole buffer, can't even construct a valid + // iterator range; can't possibly find the string we seek. + if (offset > mStreambuf.size()) + { + return npos; + } + + boost::asio::streambuf::const_buffers_type cbufs = mStreambuf.data(); + boost::asio::buffers_iterator<boost::asio::streambuf::const_buffers_type> + begin(boost::asio::buffers_begin(cbufs)), + end (boost::asio::buffers_end(cbufs)), + found(std::search(begin + offset, end, seek.begin(), seek.end())); + return (found == end)? npos : (found - begin); } -private: - bool tick(const LLSD&) + virtual size_type find(char seek, size_type offset=0) const { - // Allocate a buffer and try, every time, to read into it. - std::vector<char> buffer(4096); - apr_size_t gotten(buffer.size()); - apr_status_t err = apr_file_read(mPipe, &buffer[0], &gotten); - if (err == APR_EOF) + // If offset is beyond the whole buffer, can't even construct a valid + // iterator range; can't possibly find the char we seek. + if (offset > mStreambuf.size()) { - // Handle EOF specially: it's part of normal-case processing. - LL_DEBUGS("LLProcess") << "EOF on " << mDesc << LL_ENDL; - // We won't need any more tick() calls. - mConnection.disconnect(); + return npos; } - else if (! ll_apr_warn_status(err)) // validate anything but EOF + + boost::asio::streambuf::const_buffers_type cbufs = mStreambuf.data(); + boost::asio::buffers_iterator<boost::asio::streambuf::const_buffers_type> + begin(boost::asio::buffers_begin(cbufs)), + end (boost::asio::buffers_end(cbufs)), + found(std::find(begin + offset, end, seek)); + return (found == end)? npos : (found - begin); + } + +private: + bool tick(const LLSD&) + { + typedef boost::asio::streambuf::mutable_buffers_type mutable_buffer_sequence; + // Try, every time, to read into our streambuf. In fact, we have no + // idea how much data the child might be trying to send: keep trying + // until we're convinced we've temporarily exhausted the pipe. + bool exhausted = false; + std::size_t committed(0); + do { - // 'gotten' was modified to reflect the number of bytes actually - // received. If nonzero, add them to the streambuf and notify - // interested parties. - if (gotten) + // attempt to read an arbitrary size + mutable_buffer_sequence bufs = mStreambuf.prepare(4096); + // In general, the mutable_buffer_sequence returned by prepare() might + // contain a number of different physical buffers; iterate over those. + std::size_t tocommit(0); + for (mutable_buffer_sequence::const_iterator bufi(bufs.begin()), bufend(bufs.end()); + bufi != bufend; ++bufi) { - boost::asio::streambuf::mutable_buffers_type mbufs = mStreambuf.prepare(gotten); - std::copy(buffer.begin(), buffer.begin() + gotten, - boost::asio::buffers_begin(mbufs)); - // Don't forget to "commit" the data! The sequence (prepare(), - // commit()) is obviously intended to allow us to allocate - // buffer space, then read directly into some portion of it, - // then commit only as much as we managed to obtain. But the - // only official (documented) way I can find to populate a - // mutable_buffers_type is to use buffers_begin(). It Would Be - // Nice if we were permitted to directly read into - // mutable_buffers_type (not to mention writing directly from - // const_buffers_type in WritePipeImpl; APR even supports an - // apr_file_writev() function for writing from discontiguous - // buffers) -- but as of 2012-02-14, this copying appears to - // be the safest tactic. - mStreambuf.commit(gotten); - LL_DEBUGS("LLProcess") << "read " << gotten << " of " << buffer.size() - << " bytes from " << mDesc << ", new total " - << mStreambuf.size() << LL_ENDL; - - // Now that we've received new data, publish it on our - // LLEventPump as advertised. Constrain it by mLimit. But show - // listener the actual accumulated buffer size, regardless of - // mLimit. - std::size_t datasize((std::min)(mLimit, mStreambuf.size())); - mPump.post(LLSDMap - ("data", peek(0, datasize)) - ("len", LLSD::Integer(mStreambuf.size()))); + // http://www.boost.org/doc/libs/1_49_0_beta1/doc/html/boost_asio/reference/buffer.html#boost_asio.reference.buffer.accessing_buffer_contents + std::size_t toread(boost::asio::buffer_size(*bufi)); + apr_size_t gotten(toread); + apr_status_t err = apr_file_read(mPipe, + boost::asio::buffer_cast<void*>(*bufi), + &gotten); + // EAGAIN is exactly what we want from a nonblocking pipe. + // Rather than waiting for data, it should return immediately. + if (! (err == APR_SUCCESS || APR_STATUS_IS_EAGAIN(err))) + { + // Handle EOF specially: it's part of normal-case processing. + if (err == APR_EOF) + { + LL_DEBUGS("LLProcess") << "EOF on " << mDesc << LL_ENDL; + } + else + { + LL_WARNS("LLProcess") << "apr_file_read(" << toread << ") on " << mDesc + << " got " << err << ":" << LL_ENDL; + ll_apr_warn_status(err); + } + // Either way, though, we won't need any more tick() calls. + mConnection.disconnect(); + exhausted = true; // also break outer retry loop + break; + } + + // 'gotten' was modified to reflect the number of bytes actually + // received. Make sure we commit those later. (Don't commit them + // now, that would invalidate the buffer iterator sequence!) + tocommit += gotten; + LL_DEBUGS("LLProcess") << "read " << gotten << " of " << toread + << " bytes from " << mDesc << LL_ENDL; + + // The parent end of this pipe is nonblocking. If we weren't even + // able to fill this buffer, don't loop to try to fill the next -- + // that won't change until the child writes more. Wait for next + // tick(). + if (gotten < toread) + { + // break outer retry loop too + exhausted = true; + break; + } } + + // Don't forget to "commit" the data! + mStreambuf.commit(tocommit); + committed += tocommit; + + // 'exhausted' is set when we can't fill any one buffer of the + // mutable_buffer_sequence established by the current prepare() + // call -- whether due to error or not enough bytes. That is, + // 'exhausted' is still false when we've filled every physical + // buffer in the mutable_buffer_sequence. In that case, for all we + // know, the child might have still more data pending -- go for it! + } while (! exhausted); + + if (committed) + { + // If we actually received new data, publish it on our LLEventPump + // as advertised. Constrain it by mLimit. But show listener the + // actual accumulated buffer size, regardless of mLimit. + size_type datasize((std::min)(mLimit, size_type(mStreambuf.size()))); + mPump.post(LLSDMap + ("data", peek(0, datasize)) + ("len", LLSD::Integer(mStreambuf.size()))); } + return false; } @@ -277,7 +374,7 @@ private: boost::asio::streambuf mStreambuf; std::istream mStream; LLEventStream mPump; - size_t mLimit; + size_type mLimit; }; /// Need an exception to avoid constructing an invalid LLProcess object, but @@ -472,16 +569,18 @@ LLProcess::LLProcess(const LLSDOrParams& params): { if (select[i] != APR_CHILD_BLOCK) continue; + std::string desc(STRINGIZE(mDesc << ' ' << whichfile[i])); + apr_file_t* pipe(mProcess.*(members[i])); if (i == STDIN) { - mPipes.replace(i, new WritePipeImpl(whichfile[i], mProcess.*(members[i]))); + mPipes.replace(i, new WritePipeImpl(desc, pipe)); } else { - mPipes.replace(i, new ReadPipeImpl(whichfile[i], mProcess.*(members[i]))); + mPipes.replace(i, new ReadPipeImpl(desc, pipe)); } LL_DEBUGS("LLProcess") << "Instantiating " << typeid(mPipes[i]).name() - << "('" << whichfile[i] << "')" << LL_ENDL; + << "('" << desc << "')" << LL_ENDL; } } diff --git a/indra/llcommon/llprocess.h b/indra/llcommon/llprocess.h index bf0517600d..2c6951b562 100644 --- a/indra/llcommon/llprocess.h +++ b/indra/llcommon/llprocess.h @@ -295,6 +295,9 @@ public: { public: virtual ~BasePipe() = 0; + + typedef std::size_t size_type; + static const size_type npos; }; /// As returned by getWritePipe() or getOptWritePipe() @@ -338,7 +341,7 @@ public: * the child, but the child happens to flush "12" before emitting * "3\n", get_istream() >> myint could return 12 rather than 123! */ - virtual std::size_t size() = 0; + virtual size_type size() const = 0; /** * Peek at accumulated buffer data without consuming it. Optional @@ -346,14 +349,32 @@ public: * * @note You can discard buffer data using get_istream().ignore(n). */ - virtual std::string peek(std::size_t offset=0, - std::size_t len=(std::numeric_limits<std::size_t>::max)()) = 0; + virtual std::string peek(size_type offset=0, size_type len=npos) const = 0; + + /** + * Detect presence of a substring (or char) in accumulated buffer data + * without retrieving it. Optional offset allows you to search from + * specified position. + */ + template <typename SEEK> + bool contains(SEEK seek, size_type offset=0) const + { return find(seek, offset) != npos; } + + /** + * Search for a substring in accumulated buffer data without + * retrieving it. Returns size_type position at which found, or npos + * meaning not found. Optional offset allows you to search from + * specified position. + */ + virtual size_type find(const std::string& seek, size_type offset=0) const = 0; /** - * Search accumulated buffer data without retrieving it. Optional - * offset allows you to start at specified position. + * Search for a char in accumulated buffer data without retrieving it. + * Returns size_type position at which found, or npos meaning not + * found. Optional offset allows you to search from specified + * position. */ - virtual bool contains(const std::string& seek, std::size_t offset=0) = 0; + virtual size_type find(char seek, size_type offset=0) const = 0; /** * Get LLEventPump& on which to listen for incoming data. The posted @@ -377,12 +398,12 @@ public: * the data posted with the LLSD event. If you don't call this method, * all pending data will be posted. */ - virtual void setLimit(size_t limit) = 0; + virtual void setLimit(size_type limit) = 0; /** * Query the current setLimit() limit. */ - virtual size_t getLimit() const = 0; + virtual size_type getLimit() const = 0; }; /// Exception thrown by getWritePipe(), getReadPipe() if you didn't ask to diff --git a/indra/llcommon/tests/llprocess_test.cpp b/indra/llcommon/tests/llprocess_test.cpp index 31bc833a1d..d7bda34923 100644 --- a/indra/llcommon/tests/llprocess_test.cpp +++ b/indra/llcommon/tests/llprocess_test.cpp @@ -1131,5 +1131,7 @@ namespace tut // test setLimit(), getLimit() // test EOF -- check logging // test peek() with substr + // test contains(char) + // test find(string, offset), find(char, offset), offset <, =, > size() } // namespace tut |