summaryrefslogtreecommitdiff
path: root/indra/llcommon/llprocess.cpp
diff options
context:
space:
mode:
authorNat Goodspeed <nat@lindenlab.com>2012-02-15 10:07:09 -0500
committerNat Goodspeed <nat@lindenlab.com>2012-02-15 10:07:09 -0500
commite239cad1f509e3d96011acb61614f2481c46af38 (patch)
treeb6b187c3d774c3f04410b89ce347b8c7dafdfdcd /indra/llcommon/llprocess.cpp
parentaae61392be822218cabcab91d95eb1e75d471764 (diff)
Preliminary pipe support for LLProcess.
Add LLProcess::FileParam to specify how to construct each child's standard file slot, with lots of comments about features designed but not yet implemented. The point is to design it with enough flexibility to be able to extend to foreseeable use cases. Add LLProcess::Params::files to collect up to 3 FileParam items. Naturally this extends the accepted LLSD syntax as well. Implement type="" (child inherits parent file descriptor) and "pipe" (parent constructs anonymous pipe to pass to child). Add LLProcess::FILESLOT enum, plus methods: getReadPipe(FILESLOT), getOptReadPipe(FILESLOT) getWritePipe(), getOptWritePipe() getPipeName(FILESLOT): placeholder implementation for now Add LLProcess::ReadPipe and WritePipe classes, as returned by get*Pipe(). WritePipe supports get_ostream() method for streaming to child stdin. ReadPipe supports get_istream() method for reading from child stdout/stderr. It also provides getPump() returning LLEventPump& so interested parties can listen for arrival of new data on the aforementioned std::istream. For "pipe" slots, instantiate appropriate *Pipe class. ReadPipe and WritePipe classes are pure virtual bases for ReadPipeImpl and WritePipeImpl, respectively: all implementation data are hidden in the latter classes, visible only in llprocess.cpp. In fact each *PipeImpl class registers itself for "mainloop" ticks, attempting nonblocking I/O to the underlying apr_file_t on each tick. Data are buffered in a boost::asio::streambuf, which bridges between std::[io]stream and the APR I/O calls. Sanity-test ReadPipeImpl by using a pipe to absorb the Python "SyntaxError" output from the successful syntax_error test, rather than alarming the user. Add first few unit tests for validating FileParam. More tests coming!
Diffstat (limited to 'indra/llcommon/llprocess.cpp')
-rw-r--r--indra/llcommon/llprocess.cpp321
1 files changed, 305 insertions, 16 deletions
diff --git a/indra/llcommon/llprocess.cpp b/indra/llcommon/llprocess.cpp
index b13e8eb8e0..55eb7e69d3 100644
--- a/indra/llcommon/llprocess.cpp
+++ b/indra/llcommon/llprocess.cpp
@@ -26,6 +26,7 @@
#include "linden_common.h"
#include "llprocess.h"
+#include "llsdutil.h"
#include "llsdserialize.h"
#include "llsingleton.h"
#include "llstring.h"
@@ -36,19 +37,20 @@
#include <boost/foreach.hpp>
#include <boost/bind.hpp>
+#include <boost/asio/streambuf.hpp>
+#include <boost/asio/buffers_iterator.hpp>
#include <iostream>
#include <stdexcept>
+#include <limits>
+#include <algorithm>
+#include <vector>
+#include <typeinfo>
+#include <utility>
+static const char* whichfile[] = { "stdin", "stdout", "stderr" };
static std::string empty;
static LLProcess::Status interpret_status(int status);
-/// Need an exception to avoid constructing an invalid LLProcess object, but
-/// internal use only
-struct LLProcessError: public std::runtime_error
-{
- LLProcessError(const std::string& msg): std::runtime_error(msg) {}
-};
-
/**
* Ref-counted "mainloop" listener. As long as there are still outstanding
* LLProcess objects, keep listening on "mainloop" so we can keep polling APR
@@ -117,6 +119,154 @@ private:
};
static LLProcessListener sProcessListener;
+LLProcess::BasePipe::~BasePipe() {}
+
+class WritePipeImpl: public LLProcess::WritePipe
+{
+public:
+ WritePipeImpl(const std::string& desc, apr_file_t* pipe):
+ mDesc(desc),
+ mPipe(pipe),
+ // Essential to initialize our std::ostream with our special streambuf!
+ mStream(&mStreambuf)
+ {
+ mConnection = LLEventPumps::instance().obtain("mainloop")
+ .listen(LLEventPump::inventName("WritePipe"),
+ boost::bind(&WritePipeImpl::tick, this, _1));
+ }
+
+ virtual std::ostream& get_ostream() { return mStream; }
+
+ bool tick(const LLSD&)
+ {
+ // If there's anything to send, try to send it.
+ if (mStreambuf.size())
+ {
+ // Copy data out from mStreambuf to a flat, contiguous buffer to
+ // write -- but only up to a certain size.
+ std::streamsize total(mStreambuf.size());
+ std::streamsize bufsize((std::min)(4096, total));
+ boost::asio::streambuf::const_buffers_type bufs = mStreambuf.data();
+ std::vector<char> buffer(
+ boost::asio::buffers_begin(bufs),
+ boost::asio::buffers_begin(bufs) + bufsize);
+ apr_size_t written(bufsize);
+ ll_apr_warn_status(apr_file_write(mPipe, &buffer[0], &written));
+ // 'written' is modified to reflect the number of bytes actually
+ // written. Since they've been sent, remove them from the
+ // streambuf so we don't keep trying to send them. This could be
+ // anywhere from 0 up to mStreambuf.size(); anything we haven't
+ // yet sent, we'll try again next tick() call.
+ mStreambuf.consume(written);
+ LL_DEBUGS("LLProcess") << "wrote " << written << " of " << bufsize
+ << " bytes to " << mDesc
+ << " (original " << total << "), "
+ << mStreambuf.size() << " remaining" << LL_ENDL;
+ }
+ return false;
+ }
+
+private:
+ std::string mDesc;
+ apr_file_t* mPipe;
+ LLTempBoundListener mConnection;
+ boost::asio::streambuf mStreambuf;
+ std::ostream mStream;
+};
+
+class ReadPipeImpl: public LLProcess::ReadPipe
+{
+public:
+ ReadPipeImpl(const std::string& desc, apr_file_t* pipe):
+ mDesc(desc),
+ mPipe(pipe),
+ // Essential to initialize our std::istream with our special streambuf!
+ mStream(&mStreambuf),
+ mPump("ReadPipe"),
+ // use funky syntax to call max() to avoid blighted max() macros
+ mLimit((std::numeric_limits<size_t>::max)())
+ {
+ mConnection = LLEventPumps::instance().obtain("mainloop")
+ .listen(LLEventPump::inventName("ReadPipe"),
+ boost::bind(&ReadPipeImpl::tick, this, _1));
+ }
+
+ // Much of the implementation is simply connecting the abstract virtual
+ // methods with implementation data concealed from the base class.
+ virtual std::istream& get_istream() { return mStream; }
+ virtual LLEventPump& getPump() { return mPump; }
+ virtual void setLimit(size_t limit) { mLimit = limit; }
+ virtual size_t getLimit() const { return mLimit; }
+
+private:
+ bool tick(const LLSD&)
+ {
+ // Allocate a buffer and try, every time, to read into it.
+ std::vector<char> buffer(4096);
+ apr_size_t gotten(buffer.size());
+ apr_status_t err = apr_file_read(mPipe, &buffer[0], &gotten);
+ if (err == APR_EOF)
+ {
+ // Handle EOF specially: it's part of normal-case processing.
+ LL_DEBUGS("LLProcess") << "EOF on " << mDesc << LL_ENDL;
+ // We won't need any more tick() calls.
+ mConnection.disconnect();
+ }
+ else if (! ll_apr_warn_status(err)) // validate anything but EOF
+ {
+ // 'gotten' was modified to reflect the number of bytes actually
+ // received. If nonzero, add them to the streambuf and notify
+ // interested parties.
+ if (gotten)
+ {
+ boost::asio::streambuf::mutable_buffers_type mbufs = mStreambuf.prepare(gotten);
+ std::copy(buffer.begin(), buffer.begin() + gotten,
+ boost::asio::buffers_begin(mbufs));
+ // Don't forget to "commit" the data! The sequence (prepare(),
+ // commit()) is obviously intended to allow us to allocate
+ // buffer space, then read directly into some portion of it,
+ // then commit only as much as we managed to obtain. But the
+ // only official (documented) way I can find to populate a
+ // mutable_buffers_type is to use buffers_begin(). It Would Be
+ // Nice if we were permitted to directly read into
+ // mutable_buffers_type (not to mention writing directly from
+ // const_buffers_type in WritePipeImpl; APR even supports an
+ // apr_file_writev() function for writing from discontiguous
+ // buffers) -- but as of 2012-02-14, this copying appears to
+ // be the safest tactic.
+ mStreambuf.commit(gotten);
+ LL_DEBUGS("LLProcess") << "read " << gotten << " of " << buffer.size()
+ << " bytes from " << mDesc << ", new total "
+ << mStreambuf.size() << LL_ENDL;
+
+ // Now that we've received new data, publish it on our
+ // LLEventPump as advertised. Constrain it by mLimit.
+ std::streamsize datasize((std::min)(mLimit, mStreambuf.size()));
+ boost::asio::streambuf::const_buffers_type cbufs = mStreambuf.data();
+ mPump.post(LLSDMap("data", LLSD::String(
+ boost::asio::buffers_begin(cbufs),
+ boost::asio::buffers_begin(cbufs) + datasize)));
+ }
+ }
+ return false;
+ }
+
+ std::string mDesc;
+ apr_file_t* mPipe;
+ LLTempBoundListener mConnection;
+ boost::asio::streambuf mStreambuf;
+ std::istream mStream;
+ LLEventStream mPump;
+ size_t mLimit;
+};
+
+/// Need an exception to avoid constructing an invalid LLProcess object, but
+/// internal use only
+struct LLProcessError: public std::runtime_error
+{
+ LLProcessError(const std::string& msg): std::runtime_error(msg) {}
+};
+
LLProcessPtr LLProcess::create(const LLSDOrParams& params)
{
try
@@ -134,12 +284,23 @@ LLProcessPtr LLProcess::create(const LLSDOrParams& params)
/// throw LLProcessError mentioning the function call that produced that
/// result.
#define chkapr(func) \
- if (ll_apr_warn_status(func)) \
- throw LLProcessError(#func " failed")
+ if (ll_apr_warn_status(func)) \
+ throw LLProcessError(#func " failed")
LLProcess::LLProcess(const LLSDOrParams& params):
- mAutokill(params.autokill)
+ mAutokill(params.autokill),
+ mPipes(NSLOTS)
{
+ // Hmm, when you construct a ptr_vector with a size, it merely reserves
+ // space, it doesn't actually make it that big. Explicitly make it bigger.
+ // Because of ptr_vector's odd semantics, have to push_back(0) the right
+ // number of times! resize() wants to default-construct new BasePipe
+ // instances, which fails because it's pure virtual. But because of the
+ // constructor call, these push_back() calls should require no new
+ // allocation.
+ for (size_t i = 0; i < mPipes.capacity(); ++i)
+ mPipes.push_back(0);
+
if (! params.validateBlock(true))
{
throw LLProcessError(STRINGIZE("not launched: failed parameter validation\n"
@@ -154,16 +315,46 @@ LLProcess::LLProcess(const LLSDOrParams& params):
// apr_procattr_io_set() alternatives: inherit the viewer's own stdxxx
// handle (APR_NO_PIPE, e.g. for stdout, stderr), or create a pipe that's
// blocking on the child end but nonblocking at the viewer end
- // (APR_CHILD_BLOCK). The viewer can't block for anything: the parent end
- // MUST be nonblocking. As the APR documentation itself points out, it
- // makes very little sense to set nonblocking I/O for the child end of a
- // pipe: only a specially-written child could deal with that.
+ // (APR_CHILD_BLOCK).
// Other major options could include explicitly creating a single APR pipe
// and passing it as both stdout and stderr (apr_procattr_child_out_set(),
// apr_procattr_child_err_set()), or accepting a filename, opening it and
// passing that apr_file_t (simple <, >, 2> redirect emulation).
-// chkapr(apr_procattr_io_set(procattr, APR_CHILD_BLOCK, APR_CHILD_BLOCK, APR_CHILD_BLOCK));
- chkapr(apr_procattr_io_set(procattr, APR_NO_PIPE, APR_NO_PIPE, APR_NO_PIPE));
+ std::vector<FileParam> fparams(params.files.begin(), params.files.end());
+ // By default, pass APR_NO_PIPE for each slot.
+ std::vector<apr_int32_t> select(LL_ARRAY_SIZE(whichfile), APR_NO_PIPE);
+ for (size_t i = 0; i < (std::min)(LL_ARRAY_SIZE(whichfile), fparams.size()); ++i)
+ {
+ if (std::string(fparams[i].type).empty()) // inherit our file descriptor
+ {
+ select[i] = APR_NO_PIPE;
+ }
+ else if (std::string(fparams[i].type) == "pipe") // anonymous pipe
+ {
+ if (! std::string(fparams[i].name).empty())
+ {
+ LL_WARNS("LLProcess") << "For " << std::string(params.executable)
+ << ": internal names for reusing pipes ('"
+ << std::string(fparams[i].name) << "' for " << whichfile[i]
+ << ") are not yet supported -- creating distinct pipe"
+ << LL_ENDL;
+ }
+ // The viewer can't block for anything: the parent end MUST be
+ // nonblocking. As the APR documentation itself points out, it
+ // makes very little sense to set nonblocking I/O for the child
+ // end of a pipe: only a specially-written child could deal with
+ // that.
+ select[i] = APR_CHILD_BLOCK;
+ }
+ else
+ {
+ throw LLProcessError(STRINGIZE("For " << std::string(params.executable)
+ << ": unsupported FileParam for " << whichfile[i]
+ << ": type='" << std::string(fparams[i].type)
+ << "', name='" << std::string(fparams[i].name) << "'"));
+ }
+ }
+ chkapr(apr_procattr_io_set(procattr, select[STDIN], select[STDOUT], select[STDERR]));
// Thumbs down on implicitly invoking the shell to invoke the child. From
// our point of view, the other major alternative to APR_PROGRAM_PATH
@@ -251,6 +442,27 @@ LLProcess::LLProcess(const LLSDOrParams& params):
// On Windows, associate the new child process with our Job Object.
autokill();
}
+
+ // Instantiate the proper pipe I/O machinery
+ // want to be able to point to apr_proc_t::in, out, err by index
+ typedef apr_file_t* apr_proc_t::*apr_proc_file_ptr;
+ static apr_proc_file_ptr members[] =
+ { &apr_proc_t::in, &apr_proc_t::out, &apr_proc_t::err };
+ for (size_t i = 0; i < NSLOTS; ++i)
+ {
+ if (select[i] != APR_CHILD_BLOCK)
+ continue;
+ if (i == STDIN)
+ {
+ mPipes.replace(i, new WritePipeImpl(whichfile[i], mProcess.*(members[i])));
+ }
+ else
+ {
+ mPipes.replace(i, new ReadPipeImpl(whichfile[i], mProcess.*(members[i])));
+ }
+ LL_DEBUGS("LLProcess") << "Instantiating " << typeid(mPipes[i]).name()
+ << "('" << whichfile[i] << "')" << LL_ENDL;
+ }
}
LLProcess::~LLProcess()
@@ -428,6 +640,83 @@ LLProcess::handle LLProcess::getProcessHandle() const
#endif
}
+std::string LLProcess::getPipeName(FILESLOT)
+{
+ // LLProcess::FileParam::type "npipe" is not yet implemented
+ return "";
+}
+
+template<class PIPETYPE>
+PIPETYPE* LLProcess::getPipePtr(std::string& error, FILESLOT slot)
+{
+ if (slot >= NSLOTS)
+ {
+ error = STRINGIZE(mDesc << " has no slot " << slot);
+ return NULL;
+ }
+ if (mPipes.is_null(slot))
+ {
+ error = STRINGIZE(mDesc << ' ' << whichfile[slot] << " not a monitored pipe");
+ return NULL;
+ }
+ // Make sure we dynamic_cast in pointer domain so we can test, rather than
+ // accepting runtime's exception.
+ PIPETYPE* ppipe = dynamic_cast<PIPETYPE*>(&mPipes[slot]);
+ if (! ppipe)
+ {
+ error = STRINGIZE(mDesc << ' ' << whichfile[slot] << " not a " << typeid(PIPETYPE).name());
+ return NULL;
+ }
+
+ error.clear();
+ return ppipe;
+}
+
+template <class PIPETYPE>
+PIPETYPE& LLProcess::getPipe(FILESLOT slot)
+{
+ std::string error;
+ PIPETYPE* wp = getPipePtr<PIPETYPE>(error, slot);
+ if (! wp)
+ {
+ throw NoPipe(error);
+ }
+ return *wp;
+}
+
+template <class PIPETYPE>
+boost::optional<PIPETYPE&> LLProcess::getOptPipe(FILESLOT slot)
+{
+ std::string error;
+ PIPETYPE* wp = getPipePtr<PIPETYPE>(error, slot);
+ if (! wp)
+ {
+ LL_DEBUGS("LLProcess") << error << LL_ENDL;
+ return boost::optional<PIPETYPE&>();
+ }
+ return *wp;
+}
+
+LLProcess::WritePipe& LLProcess::getWritePipe(FILESLOT slot)
+{
+ return getPipe<WritePipe>(slot);
+}
+
+boost::optional<LLProcess::WritePipe&> LLProcess::getOptWritePipe(FILESLOT slot)
+{
+ return getOptPipe<WritePipe>(slot);
+}
+
+LLProcess::ReadPipe& LLProcess::getReadPipe(FILESLOT slot)
+{
+ return getPipe<ReadPipe>(slot);
+}
+
+boost::optional<LLProcess::ReadPipe&> LLProcess::getOptReadPipe(FILESLOT slot)
+{
+ return getOptPipe<ReadPipe>(slot);
+}
+
std::ostream& operator<<(std::ostream& out, const LLProcess::Params& params)
{
std::string cwd(params.cwd);