summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorNat Goodspeed <nat@lindenlab.com>2012-02-16 16:05:04 -0500
committerNat Goodspeed <nat@lindenlab.com>2012-02-16 16:05:04 -0500
commite92c3113545dd60fb76e115da201163e340c730c (patch)
tree4c9a5e231f0e2ad0075a25a7154e7e181434675d
parent85057908c3f7e48f1dc086ea1c82e672674b2596 (diff)
Add LLProcess::ReadPipe::find() methods, with corresponding npos.
If it's useful to have contains() to tell you whether incoming data contains a particular substring, and if it's useful for contains() and peek() to accept an offset within that data, then it's useful to allow you to get the offset of a desired substring within that data. But of course a find() returning offset needs something like std::string::npos for "not found"; borrow that convention. Support both find(const std::string&) and find(char); the latter permits a more efficient implementation. In fact, make find(string) recognize a string of length 1 and leverage the find(char) implementation. Given that, reimplement contains(mumble) as shorthand for find(mumble) != npos. Implement find() overloads using std::search() and std::find() on boost::asio::streambuf character iterators, rather than copying to std::string and then using string search like previous contains() implementation. Reimplement WritePipeImpl::tick() and ReadPipeImpl::tick() to write/read directly from/to boost::asio::streambuf data, instead of copying to/from a temporary flat buffer. As long as ReadPipeImpl::tick() keeps successfully filling buffers, keep reading. Previous implementation would only handle a long child write over successive tick() calls. Stop on read error or when we come up short.
-rw-r--r--indra/llcommon/llprocess.cpp259
-rw-r--r--indra/llcommon/llprocess.h37
-rw-r--r--indra/llcommon/tests/llprocess_test.cpp2
3 files changed, 210 insertions, 88 deletions
diff --git a/indra/llcommon/llprocess.cpp b/indra/llcommon/llprocess.cpp
index 1481bf571f..aa22b3f805 100644
--- a/indra/llcommon/llprocess.cpp
+++ b/indra/llcommon/llprocess.cpp
@@ -120,9 +120,12 @@ private:
static LLProcessListener sProcessListener;
LLProcess::BasePipe::~BasePipe() {}
+const LLProcess::BasePipe::size_type
+ LLProcess::BasePipe::npos((std::numeric_limits<LLProcess::BasePipe::size_type>::max)());
class WritePipeImpl: public LLProcess::WritePipe
{
+ LOG_CLASS(WritePipeImpl);
public:
WritePipeImpl(const std::string& desc, apr_file_t* pipe):
mDesc(desc),
@@ -139,30 +142,53 @@ public:
bool tick(const LLSD&)
{
+ typedef boost::asio::streambuf::const_buffers_type const_buffer_sequence;
// If there's anything to send, try to send it.
- if (mStreambuf.size())
+ std::size_t total(mStreambuf.size()), consumed(0);
+ if (total)
{
- // Copy data out from mStreambuf to a flat, contiguous buffer to
- // write -- but only up to a certain size.
- std::size_t total(mStreambuf.size());
- std::size_t bufsize((std::min)(std::size_t(4096), total));
- boost::asio::streambuf::const_buffers_type bufs = mStreambuf.data();
- std::vector<char> buffer(
- boost::asio::buffers_begin(bufs),
- boost::asio::buffers_begin(bufs) + bufsize);
- apr_size_t written(bufsize);
- ll_apr_warn_status(apr_file_write(mPipe, &buffer[0], &written));
- // 'written' is modified to reflect the number of bytes actually
- // written. Since they've been sent, remove them from the
+ const_buffer_sequence bufs = mStreambuf.data();
+ // In general, our streambuf might contain a number of different
+ // physical buffers; iterate over those.
+ for (const_buffer_sequence::const_iterator bufi(bufs.begin()), bufend(bufs.end());
+ bufi != bufend; ++bufi)
+ {
+ // http://www.boost.org/doc/libs/1_49_0_beta1/doc/html/boost_asio/reference/buffer.html#boost_asio.reference.buffer.accessing_buffer_contents
+ std::size_t towrite(boost::asio::buffer_size(*bufi));
+ apr_size_t written(towrite);
+ apr_status_t err = apr_file_write(mPipe,
+ boost::asio::buffer_cast<const void*>(*bufi),
+ &written);
+ // EAGAIN is exactly what we want from a nonblocking pipe.
+ // Rather than waiting for data, it should return immediately.
+ if (! (err == APR_SUCCESS || APR_STATUS_IS_EAGAIN(err)))
+ {
+ LL_WARNS("LLProcess") << "apr_file_write(" << towrite << ") on " << mDesc
+ << " got " << err << ":" << LL_ENDL;
+ ll_apr_warn_status(err);
+ }
+
+ // 'written' is modified to reflect the number of bytes actually
+ // written. Make sure we consume those later. (Don't consume them
+ // now, that would invalidate the buffer iterator sequence!)
+ consumed += written;
+ LL_DEBUGS("LLProcess") << "wrote " << written << " of " << towrite
+ << " bytes to " << mDesc
+ << " (original " << total << ")" << LL_ENDL;
+
+ // The parent end of this pipe is nonblocking. If we weren't able
+ // to write everything we wanted, don't keep banging on it -- that
+ // won't change until the child reads some. Wait for next tick().
+ if (written < towrite)
+ break;
+ }
+ // In all, we managed to write 'consumed' bytes. Remove them from the
// streambuf so we don't keep trying to send them. This could be
- // anywhere from 0 up to mStreambuf.size(); anything we haven't
- // yet sent, we'll try again next tick() call.
- mStreambuf.consume(written);
- LL_DEBUGS("LLProcess") << "wrote " << written << " of " << bufsize
- << " bytes to " << mDesc
- << " (original " << total << "), "
- << mStreambuf.size() << " remaining" << LL_ENDL;
+ // anywhere from 0 up to mStreambuf.size(); anything we haven't yet
+ // sent, we'll try again later.
+ mStreambuf.consume(consumed);
}
+
return false;
}
@@ -176,6 +202,7 @@ private:
class ReadPipeImpl: public LLProcess::ReadPipe
{
+ LOG_CLASS(ReadPipeImpl);
public:
ReadPipeImpl(const std::string& desc, apr_file_t* pipe):
mDesc(desc),
@@ -184,7 +211,7 @@ public:
mStream(&mStreambuf),
mPump("ReadPipe"),
// use funky syntax to call max() to avoid blighted max() macros
- mLimit((std::numeric_limits<size_t>::max)())
+ mLimit(npos)
{
mConnection = LLEventPumps::instance().obtain("mainloop")
.listen(LLEventPump::inventName("ReadPipe"),
@@ -195,79 +222,149 @@ public:
// methods with implementation data concealed from the base class.
virtual std::istream& get_istream() { return mStream; }
virtual LLEventPump& getPump() { return mPump; }
- virtual void setLimit(size_t limit) { mLimit = limit; }
- virtual size_t getLimit() const { return mLimit; }
- virtual std::size_t size() { return mStreambuf.size(); }
+ virtual void setLimit(size_type limit) { mLimit = limit; }
+ virtual size_type getLimit() const { return mLimit; }
+ virtual size_type size() const { return mStreambuf.size(); }
- virtual std::string peek(std::size_t offset=0,
- std::size_t len=(std::numeric_limits<std::size_t>::max)())
+ virtual std::string peek(size_type offset=0, size_type len=npos) const
{
// Constrain caller's offset and len to overlap actual buffer content.
- std::size_t real_offset = (std::min)(mStreambuf.size(), offset);
- std::size_t real_end = (std::min)(mStreambuf.size(), real_offset + len);
+ std::size_t real_offset = (std::min)(mStreambuf.size(), std::size_t(offset));
+ std::size_t real_end = (std::min)(mStreambuf.size(), std::size_t(real_offset + len));
boost::asio::streambuf::const_buffers_type cbufs = mStreambuf.data();
return std::string(boost::asio::buffers_begin(cbufs) + real_offset,
boost::asio::buffers_begin(cbufs) + real_end);
}
- virtual bool contains(const std::string& seek, std::size_t offset=0)
+ virtual size_type find(const std::string& seek, size_type offset=0) const
{
- // There may be a more efficient way to search mStreambuf contents,
- // but this is far the easiest...
- return peek(offset).find(seek) != std::string::npos;
+ // If we're passing a string of length 1, use find(char), which can
+ // use an O(n) std::find() rather than the O(n^2) std::search().
+ if (seek.length() == 1)
+ {
+ return find(seek[0], offset);
+ }
+
+ // If offset is beyond the whole buffer, can't even construct a valid
+ // iterator range; can't possibly find the string we seek.
+ if (offset > mStreambuf.size())
+ {
+ return npos;
+ }
+
+ boost::asio::streambuf::const_buffers_type cbufs = mStreambuf.data();
+ boost::asio::buffers_iterator<boost::asio::streambuf::const_buffers_type>
+ begin(boost::asio::buffers_begin(cbufs)),
+ end (boost::asio::buffers_end(cbufs)),
+ found(std::search(begin + offset, end, seek.begin(), seek.end()));
+ return (found == end)? npos : (found - begin);
}
-private:
- bool tick(const LLSD&)
+ virtual size_type find(char seek, size_type offset=0) const
{
- // Allocate a buffer and try, every time, to read into it.
- std::vector<char> buffer(4096);
- apr_size_t gotten(buffer.size());
- apr_status_t err = apr_file_read(mPipe, &buffer[0], &gotten);
- if (err == APR_EOF)
+ // If offset is beyond the whole buffer, can't even construct a valid
+ // iterator range; can't possibly find the char we seek.
+ if (offset > mStreambuf.size())
{
- // Handle EOF specially: it's part of normal-case processing.
- LL_DEBUGS("LLProcess") << "EOF on " << mDesc << LL_ENDL;
- // We won't need any more tick() calls.
- mConnection.disconnect();
+ return npos;
}
- else if (! ll_apr_warn_status(err)) // validate anything but EOF
+
+ boost::asio::streambuf::const_buffers_type cbufs = mStreambuf.data();
+ boost::asio::buffers_iterator<boost::asio::streambuf::const_buffers_type>
+ begin(boost::asio::buffers_begin(cbufs)),
+ end (boost::asio::buffers_end(cbufs)),
+ found(std::find(begin + offset, end, seek));
+ return (found == end)? npos : (found - begin);
+ }
+
+private:
+ bool tick(const LLSD&)
+ {
+ typedef boost::asio::streambuf::mutable_buffers_type mutable_buffer_sequence;
+ // Try, every time, to read into our streambuf. In fact, we have no
+ // idea how much data the child might be trying to send: keep trying
+ // until we're convinced we've temporarily exhausted the pipe.
+ bool exhausted = false;
+ std::size_t committed(0);
+ do
{
- // 'gotten' was modified to reflect the number of bytes actually
- // received. If nonzero, add them to the streambuf and notify
- // interested parties.
- if (gotten)
+ // attempt to read an arbitrary size
+ mutable_buffer_sequence bufs = mStreambuf.prepare(4096);
+ // In general, the mutable_buffer_sequence returned by prepare() might
+ // contain a number of different physical buffers; iterate over those.
+ std::size_t tocommit(0);
+ for (mutable_buffer_sequence::const_iterator bufi(bufs.begin()), bufend(bufs.end());
+ bufi != bufend; ++bufi)
{
- boost::asio::streambuf::mutable_buffers_type mbufs = mStreambuf.prepare(gotten);
- std::copy(buffer.begin(), buffer.begin() + gotten,
- boost::asio::buffers_begin(mbufs));
- // Don't forget to "commit" the data! The sequence (prepare(),
- // commit()) is obviously intended to allow us to allocate
- // buffer space, then read directly into some portion of it,
- // then commit only as much as we managed to obtain. But the
- // only official (documented) way I can find to populate a
- // mutable_buffers_type is to use buffers_begin(). It Would Be
- // Nice if we were permitted to directly read into
- // mutable_buffers_type (not to mention writing directly from
- // const_buffers_type in WritePipeImpl; APR even supports an
- // apr_file_writev() function for writing from discontiguous
- // buffers) -- but as of 2012-02-14, this copying appears to
- // be the safest tactic.
- mStreambuf.commit(gotten);
- LL_DEBUGS("LLProcess") << "read " << gotten << " of " << buffer.size()
- << " bytes from " << mDesc << ", new total "
- << mStreambuf.size() << LL_ENDL;
-
- // Now that we've received new data, publish it on our
- // LLEventPump as advertised. Constrain it by mLimit. But show
- // listener the actual accumulated buffer size, regardless of
- // mLimit.
- std::size_t datasize((std::min)(mLimit, mStreambuf.size()));
- mPump.post(LLSDMap
- ("data", peek(0, datasize))
- ("len", LLSD::Integer(mStreambuf.size())));
+ // http://www.boost.org/doc/libs/1_49_0_beta1/doc/html/boost_asio/reference/buffer.html#boost_asio.reference.buffer.accessing_buffer_contents
+ std::size_t toread(boost::asio::buffer_size(*bufi));
+ apr_size_t gotten(toread);
+ apr_status_t err = apr_file_read(mPipe,
+ boost::asio::buffer_cast<void*>(*bufi),
+ &gotten);
+ // EAGAIN is exactly what we want from a nonblocking pipe.
+ // Rather than waiting for data, it should return immediately.
+ if (! (err == APR_SUCCESS || APR_STATUS_IS_EAGAIN(err)))
+ {
+ // Handle EOF specially: it's part of normal-case processing.
+ if (err == APR_EOF)
+ {
+ LL_DEBUGS("LLProcess") << "EOF on " << mDesc << LL_ENDL;
+ }
+ else
+ {
+ LL_WARNS("LLProcess") << "apr_file_read(" << toread << ") on " << mDesc
+ << " got " << err << ":" << LL_ENDL;
+ ll_apr_warn_status(err);
+ }
+ // Either way, though, we won't need any more tick() calls.
+ mConnection.disconnect();
+ exhausted = true; // also break outer retry loop
+ break;
+ }
+
+ // 'gotten' was modified to reflect the number of bytes actually
+ // received. Make sure we commit those later. (Don't commit them
+ // now, that would invalidate the buffer iterator sequence!)
+ tocommit += gotten;
+ LL_DEBUGS("LLProcess") << "read " << gotten << " of " << toread
+ << " bytes from " << mDesc << LL_ENDL;
+
+ // The parent end of this pipe is nonblocking. If we weren't even
+ // able to fill this buffer, don't loop to try to fill the next --
+ // that won't change until the child writes more. Wait for next
+ // tick().
+ if (gotten < toread)
+ {
+ // break outer retry loop too
+ exhausted = true;
+ break;
+ }
}
+
+ // Don't forget to "commit" the data!
+ mStreambuf.commit(tocommit);
+ committed += tocommit;
+
+ // 'exhausted' is set when we can't fill any one buffer of the
+ // mutable_buffer_sequence established by the current prepare()
+ // call -- whether due to error or not enough bytes. That is,
+ // 'exhausted' is still false when we've filled every physical
+ // buffer in the mutable_buffer_sequence. In that case, for all we
+ // know, the child might have still more data pending -- go for it!
+ } while (! exhausted);
+
+ if (committed)
+ {
+ // If we actually received new data, publish it on our LLEventPump
+ // as advertised. Constrain it by mLimit. But show listener the
+ // actual accumulated buffer size, regardless of mLimit.
+ size_type datasize((std::min)(mLimit, size_type(mStreambuf.size())));
+ mPump.post(LLSDMap
+ ("data", peek(0, datasize))
+ ("len", LLSD::Integer(mStreambuf.size())));
}
+
return false;
}
@@ -277,7 +374,7 @@ private:
boost::asio::streambuf mStreambuf;
std::istream mStream;
LLEventStream mPump;
- size_t mLimit;
+ size_type mLimit;
};
/// Need an exception to avoid constructing an invalid LLProcess object, but
@@ -472,16 +569,18 @@ LLProcess::LLProcess(const LLSDOrParams& params):
{
if (select[i] != APR_CHILD_BLOCK)
continue;
+ std::string desc(STRINGIZE(mDesc << ' ' << whichfile[i]));
+ apr_file_t* pipe(mProcess.*(members[i]));
if (i == STDIN)
{
- mPipes.replace(i, new WritePipeImpl(whichfile[i], mProcess.*(members[i])));
+ mPipes.replace(i, new WritePipeImpl(desc, pipe));
}
else
{
- mPipes.replace(i, new ReadPipeImpl(whichfile[i], mProcess.*(members[i])));
+ mPipes.replace(i, new ReadPipeImpl(desc, pipe));
}
LL_DEBUGS("LLProcess") << "Instantiating " << typeid(mPipes[i]).name()
- << "('" << whichfile[i] << "')" << LL_ENDL;
+ << "('" << desc << "')" << LL_ENDL;
}
}
diff --git a/indra/llcommon/llprocess.h b/indra/llcommon/llprocess.h
index bf0517600d..2c6951b562 100644
--- a/indra/llcommon/llprocess.h
+++ b/indra/llcommon/llprocess.h
@@ -295,6 +295,9 @@ public:
{
public:
virtual ~BasePipe() = 0;
+
+ typedef std::size_t size_type;
+ static const size_type npos;
};
/// As returned by getWritePipe() or getOptWritePipe()
@@ -338,7 +341,7 @@ public:
* the child, but the child happens to flush "12" before emitting
* "3\n", get_istream() >> myint could return 12 rather than 123!
*/
- virtual std::size_t size() = 0;
+ virtual size_type size() const = 0;
/**
* Peek at accumulated buffer data without consuming it. Optional
@@ -346,14 +349,32 @@ public:
*
* @note You can discard buffer data using get_istream().ignore(n).
*/
- virtual std::string peek(std::size_t offset=0,
- std::size_t len=(std::numeric_limits<std::size_t>::max)()) = 0;
+ virtual std::string peek(size_type offset=0, size_type len=npos) const = 0;
+
+ /**
+ * Detect presence of a substring (or char) in accumulated buffer data
+ * without retrieving it. Optional offset allows you to search from
+ * specified position.
+ */
+ template <typename SEEK>
+ bool contains(SEEK seek, size_type offset=0) const
+ { return find(seek, offset) != npos; }
+
+ /**
+ * Search for a substring in accumulated buffer data without
+ * retrieving it. Returns size_type position at which found, or npos
+ * meaning not found. Optional offset allows you to search from
+ * specified position.
+ */
+ virtual size_type find(const std::string& seek, size_type offset=0) const = 0;
/**
- * Search accumulated buffer data without retrieving it. Optional
- * offset allows you to start at specified position.
+ * Search for a char in accumulated buffer data without retrieving it.
+ * Returns size_type position at which found, or npos meaning not
+ * found. Optional offset allows you to search from specified
+ * position.
*/
- virtual bool contains(const std::string& seek, std::size_t offset=0) = 0;
+ virtual size_type find(char seek, size_type offset=0) const = 0;
/**
* Get LLEventPump& on which to listen for incoming data. The posted
@@ -377,12 +398,12 @@ public:
* the data posted with the LLSD event. If you don't call this method,
* all pending data will be posted.
*/
- virtual void setLimit(size_t limit) = 0;
+ virtual void setLimit(size_type limit) = 0;
/**
* Query the current setLimit() limit.
*/
- virtual size_t getLimit() const = 0;
+ virtual size_type getLimit() const = 0;
};
/// Exception thrown by getWritePipe(), getReadPipe() if you didn't ask to
diff --git a/indra/llcommon/tests/llprocess_test.cpp b/indra/llcommon/tests/llprocess_test.cpp
index 31bc833a1d..d7bda34923 100644
--- a/indra/llcommon/tests/llprocess_test.cpp
+++ b/indra/llcommon/tests/llprocess_test.cpp
@@ -1131,5 +1131,7 @@ namespace tut
// test setLimit(), getLimit()
// test EOF -- check logging
// test peek() with substr
+ // test contains(char)
+ // test find(string, offset), find(char, offset), offset <, =, > size()
} // namespace tut