diff options
author | Nat Goodspeed <nat@lindenlab.com> | 2012-02-29 17:10:19 -0500 |
---|---|---|
committer | Nat Goodspeed <nat@lindenlab.com> | 2012-02-29 17:10:19 -0500 |
commit | 3649eda62ad3a04203e6c562e78815a95896bbd4 (patch) | |
tree | 7c53bb4c4979c8478369efecd525d001fd940b42 /indra | |
parent | 7fd281ac9971caa1dfffd42e6ff16dd44da20179 (diff) |
Guarantee LLProcess::Params::postend listener any ReadPipe data.
Previously one might get process-terminated notification but still have to
wait for the child process's final data to arrive on one or more ReadPipes.
That required complex consumer timing logic to handle incomplete pending
ReadPipe data, e.g. a partial last line with no terminating newline. New code
guarantees that by the time LLProcess sends process-terminated notification,
all pending pipe data will have been buffered in ReadPipes.
Document LLProcess::ReadPipe::getPump() notification event; add "eof" key.
Add LLProcess::ReadPipe::getline() and read() convenience methods.
Add static LLProcess::getline() and basename() convenience methods, publishing
logic already present elsewhere.
Use ReadPipe::getline() and read() in unit tests.
Add unit test for "eof" event on ReadPipe::getPump().
Add unit test verifying that final data have been buffered by termination
notification event.
Diffstat (limited to 'indra')
-rw-r--r-- | indra/llcommon/llprocess.cpp | 107 | ||||
-rw-r--r-- | indra/llcommon/llprocess.h | 31 | ||||
-rw-r--r-- | indra/llcommon/tests/llprocess_test.cpp | 139 |
3 files changed, 210 insertions, 67 deletions
diff --git a/indra/llcommon/llprocess.cpp b/indra/llcommon/llprocess.cpp index 3b17b819bd..edfdebfe87 100644 --- a/indra/llcommon/llprocess.cpp +++ b/indra/llcommon/llprocess.cpp @@ -220,7 +220,8 @@ public: // Essential to initialize our std::istream with our special streambuf! mStream(&mStreambuf), mPump("ReadPipe", true), // tweak name as needed to avoid collisions - mLimit(0) + mLimit(0), + mEOF(false) { mConnection = LLEventPumps::instance().obtain("mainloop") .listen(LLEventPump::inventName("ReadPipe"), @@ -230,11 +231,25 @@ public: // Much of the implementation is simply connecting the abstract virtual // methods with implementation data concealed from the base class. virtual std::istream& get_istream() { return mStream; } + virtual std::string getline() { return LLProcess::getline(mStream); } virtual LLEventPump& getPump() { return mPump; } virtual void setLimit(size_type limit) { mLimit = limit; } virtual size_type getLimit() const { return mLimit; } virtual size_type size() const { return mStreambuf.size(); } + virtual std::string read(size_type len) + { + // Read specified number of bytes into a buffer. Make a buffer big + // enough. + size_type readlen((std::min)(size(), len)); + std::vector<char> buffer(readlen); + mStream.read(&buffer[0], readlen); + // Since we've already clamped 'readlen', we can think of no reason + // why mStream.read() should read fewer than 'readlen' bytes. + // Nonetheless, use the actual retrieved length. + return std::string(&buffer[0], mStream.gcount()); + } + virtual std::string peek(size_type offset=0, size_type len=npos) const { // Constrain caller's offset and len to overlap actual buffer content. @@ -287,14 +302,18 @@ public: return (found == end)? npos : (found - begin); } -private: bool tick(const LLSD&) { + // Once we've hit EOF, skip all the rest of this. + if (mEOF) + return false; + typedef boost::asio::streambuf::mutable_buffers_type mutable_buffer_sequence; // Try, every time, to read into our streambuf. In fact, we have no // idea how much data the child might be trying to send: keep trying // until we're convinced we've temporarily exhausted the pipe. - bool exhausted = false; + enum PipeState { RETRY, EXHAUSTED, CLOSED }; + PipeState state = RETRY; std::size_t committed(0); do { @@ -329,7 +348,9 @@ private: } // Either way, though, we won't need any more tick() calls. mConnection.disconnect(); - exhausted = true; // also break outer retry loop + // Ignore any subsequent calls we might get anyway. + mEOF = true; + state = CLOSED; // also break outer retry loop break; } @@ -347,7 +368,7 @@ private: if (gotten < toread) { // break outer retry loop too - exhausted = true; + state = EXHAUSTED; break; } } @@ -356,15 +377,20 @@ private: mStreambuf.commit(tocommit); committed += tocommit; - // 'exhausted' is set when we can't fill any one buffer of the - // mutable_buffer_sequence established by the current prepare() - // call -- whether due to error or not enough bytes. That is, - // 'exhausted' is still false when we've filled every physical + // state is changed from RETRY when we can't fill any one buffer + // of the mutable_buffer_sequence established by the current + // prepare() call -- whether due to error or not enough bytes. + // That is, if state is still RETRY, we've filled every physical // buffer in the mutable_buffer_sequence. In that case, for all we // know, the child might have still more data pending -- go for it! - } while (! exhausted); - - if (committed) + } while (state == RETRY); + + // Once we recognize that the pipe is closed, make one more call to + // listener. The listener might be waiting for a particular substring + // to arrive, or a particular length of data or something. The event + // with "eof" == true announces that nothing further will arrive, so + // use it or lose it. + if (committed || state == CLOSED) { // If we actually received new data, publish it on our LLEventPump // as advertised. Constrain it by mLimit. But show listener the @@ -373,14 +399,16 @@ private: mPump.post(LLSDMap ("data", peek(0, datasize)) ("len", LLSD::Integer(mStreambuf.size())) - ("index", LLSD::Integer(mIndex)) + ("slot", LLSD::Integer(mIndex)) ("name", whichfile(mIndex)) - ("desc", mDesc)); + ("desc", mDesc) + ("eof", state == CLOSED)); } return false; } +private: std::string mDesc; apr_file_t* mPipe; LLProcess::FILESLOT mIndex; @@ -389,6 +417,7 @@ private: std::istream mStream; LLEventStream mPump; size_type mLimit; + bool mEOF; }; /// Need an exception to avoid constructing an invalid LLProcess object, but @@ -641,17 +670,22 @@ static std::string getDesc(const LLProcess::Params& params) // Caller didn't say. Use the executable name -- but use just the filename // part. On Mac, for instance, full pathnames get cumbersome. + return LLProcess::basename(params.executable); +} + +//static +std::string LLProcess::basename(const std::string& path) +{ // If there are Linden utility functions to manipulate pathnames, I // haven't found them -- and for this usage, Boost.Filesystem seems kind // of heavyweight. - std::string executable(params.executable); - std::string::size_type delim = executable.find_last_of("\\/"); - // If executable contains no pathname delimiters, return the whole thing. + std::string::size_type delim = path.find_last_of("\\/"); + // If path contains no pathname delimiters, return the whole thing. if (delim == std::string::npos) - return executable; + return path; // Return just the part beyond the last delimiter. - return executable.substr(delim + 1); + return path.substr(delim + 1); } LLProcess::~LLProcess() @@ -804,6 +838,24 @@ void LLProcess::handle_status(int reason, int status) // KILLED; refine below. mStatus.mState = EXITED; + // Make last-gasp calls for each of the ReadPipes we have on hand. Since + // they're listening on "mainloop", we can be sure they'll eventually + // collect all pending data from the child. But we want to be able to + // guarantee to our consumer that by the time we post on the "postend" + // LLEventPump, our ReadPipes are already buffering all the data there + // will ever be from the child. That lets the "postend" listener decide + // what to do with that final data. + for (size_t i = 0; i < mPipes.size(); ++i) + { + std::string error; + ReadPipeImpl* ppipe = getPipePtr<ReadPipeImpl>(error, FILESLOT(i)); + if (ppipe) + { + static LLSD trivial; + ppipe->tick(trivial); + } + } + // wi->rv = apr_proc_wait(wi->child, &wi->rc, &wi->why, APR_NOWAIT); // It's just wrong to call apr_proc_wait() here. The only way APR knows to // call us with APR_OC_REASON_DEATH is that it's already reaped this child @@ -919,6 +971,23 @@ boost::optional<LLProcess::ReadPipe&> LLProcess::getOptReadPipe(FILESLOT slot) return getOptPipe<ReadPipe>(slot); } +//static +std::string LLProcess::getline(std::istream& in) +{ + std::string line; + std::getline(in, line); + // Blur the distinction between "\r\n" and plain "\n". std::getline() will + // have eaten the "\n", but we could still end up with a trailing "\r". + std::string::size_type lastpos = line.find_last_not_of("\r"); + if (lastpos != std::string::npos) + { + // Found at least one character that's not a trailing '\r'. SKIP OVER + // IT and erase the rest of the line. + line.erase(lastpos+1); + } + return line; +} + std::ostream& operator<<(std::ostream& out, const LLProcess::Params& params) { if (params.cwd.isProvided()) diff --git a/indra/llcommon/llprocess.h b/indra/llcommon/llprocess.h index 637b7e2f9c..06ada83698 100644 --- a/indra/llcommon/llprocess.h +++ b/indra/llcommon/llprocess.h @@ -156,7 +156,7 @@ public: // set them rather than initialization. if (! tp.empty()) type = tp; if (! nm.empty()) name = nm; - } + } }; /// Param block definition @@ -376,6 +376,18 @@ public: virtual std::istream& get_istream() = 0; /** + * Like std::getline(get_istream(), line), but trims off trailing '\r' + * to make calling code less platform-sensitive. + */ + virtual std::string getline() = 0; + + /** + * Like get_istream().read(buffer, n), but returns std::string rather + * than requiring caller to construct a buffer, etc. + */ + virtual std::string read(size_type len) = 0; + + /** * Get accumulated buffer length. * Often we need to refrain from actually reading the std::istream * returned by get_istream() until we've accumulated enough data to @@ -420,9 +432,15 @@ public: /** * Get LLEventPump& on which to listen for incoming data. The posted - * LLSD::Map event will contain a key "data" whose value is an - * LLSD::String containing (part of) the data accumulated in the - * buffer. + * LLSD::Map event will contain: + * + * - "data" part of pending data; see setLimit() + * - "len" entire length of pending data, regardless of setLimit() + * - "slot" this ReadPipe's FILESLOT, e.g. LLProcess::STDOUT + * - "name" e.g. "stdout" + * - "desc" e.g. "SLPlugin (pid) stdout" + * - "eof" @c true means there no more data will arrive on this pipe, + * therefore no more events on this pump * * If the child sends "abc", and this ReadPipe posts "data"="abc", but * you don't consume it by reading the std::istream returned by @@ -487,6 +505,11 @@ public: */ boost::optional<ReadPipe&> getOptReadPipe(FILESLOT slot); + /// little utilities that really should already be somewhere else in the + /// code base + static std::string basename(const std::string& path); + static std::string getline(std::istream&); + private: /// constructor is private: use create() instead LLProcess(const LLSDOrParams& params); diff --git a/indra/llcommon/tests/llprocess_test.cpp b/indra/llcommon/tests/llprocess_test.cpp index b02a5c0631..3537133a47 100644 --- a/indra/llcommon/tests/llprocess_test.cpp +++ b/indra/llcommon/tests/llprocess_test.cpp @@ -295,22 +295,6 @@ public: LLError::Settings* mOldSettings; }; -std::string getline(std::istream& in) -{ - std::string line; - std::getline(in, line); - // Blur the distinction between "\r\n" and plain "\n". std::getline() will - // have eaten the "\n", but we could still end up with a trailing "\r". - std::string::size_type lastpos = line.find_last_not_of("\r"); - if (lastpos != std::string::npos) - { - // Found at least one character that's not a trailing '\r'. SKIP OVER - // IT and then erase the rest of the line. - line.erase(lastpos+1); - } - return line; -} - /***************************************************************************** * TUT *****************************************************************************/ @@ -1030,7 +1014,7 @@ namespace tut } ensure("script never started", i < timeout); ensure_equals("bad wakeup from stdin/stdout script", - getline(childout.get_istream()), "ok"); + childout.getline(), "ok"); // important to get the implicit flush from std::endl py.mPy->getWritePipe().get_ostream() << "go" << std::endl; for (i = 0; i < timeout && py.mPy->isRunning() && ! childout.contains("\n"); ++i) @@ -1038,7 +1022,7 @@ namespace tut yield(); } ensure("script never replied", childout.contains("\n")); - ensure_equals("child didn't ack", getline(childout.get_istream()), "ack"); + ensure_equals("child didn't ack", childout.getline(), "ack"); ensure_equals("bad child termination", py.mPy->getStatus().mState, LLProcess::EXITED); ensure_equals("bad child exit code", py.mPy->getStatus().mData, 0); } @@ -1130,6 +1114,32 @@ namespace tut template<> template<> void object::test<18>() { + set_test_name("ReadPipe \"eof\" event"); + PythonProcessLauncher py(get_test_name(), + "print 'Hello from Python!'\n"); + py.mParams.files.add(LLProcess::FileParam()); // stdin + py.mParams.files.add(LLProcess::FileParam("pipe")); // stdout + py.launch(); + LLProcess::ReadPipe& childout(py.mPy->getReadPipe(LLProcess::STDOUT)); + EventListener listener(childout.getPump()); + waitfor(*py.mPy); + // We can't be positive there will only be a single event, if the OS + // (or any other intervening layer) does crazy buffering. What we want + // to ensure is that there was exactly ONE event with "eof" true, and + // that it was the LAST event. + std::list<LLSD>::const_reverse_iterator rli(listener.mHistory.rbegin()), + rlend(listener.mHistory.rend()); + ensure("no events", rli != rlend); + ensure("last event not \"eof\"", (*rli)["eof"].asBoolean()); + while (++rli != rlend) + { + ensure("\"eof\" event not last", ! (*rli)["eof"].asBoolean()); + } + } + + template<> template<> + void object::test<19>() + { set_test_name("setLimit()"); PythonProcessLauncher py(get_test_name(), "import sys\n" @@ -1157,7 +1167,7 @@ namespace tut } template<> template<> - void object::test<19>() + void object::test<20>() { set_test_name("peek() ReadPipe data"); PythonProcessLauncher py(get_test_name(), @@ -1210,7 +1220,32 @@ namespace tut } template<> template<> - void object::test<20>() + void object::test<21>() + { + set_test_name("bad postend"); + std::string pumpname("postend"); + EventListener listener(LLEventPumps::instance().obtain(pumpname)); + LLProcess::Params params; + params.desc = get_test_name(); + params.postend = pumpname; + LLProcessPtr child = LLProcess::create(params); + ensure("shouldn't have launched", ! child); + ensure_equals("number of postend events", listener.mHistory.size(), 1); + LLSD postend(listener.mHistory.front()); + ensure("has id", ! postend.has("id")); + ensure_equals("desc", postend["desc"].asString(), std::string(params.desc)); + ensure_equals("state", postend["state"].asInteger(), LLProcess::UNSTARTED); + ensure("has data", ! postend.has("data")); + std::string error(postend["string"]); + // All we get from canned parameter validation is a bool, so the + // "validation failed" message we ourselves generate can't mention + // "executable" by name. Just check that it's nonempty. + //ensure_contains("error", error, "executable"); + ensure("string", ! error.empty()); + } + + template<> template<> + void object::test<22>() { set_test_name("good postend"); PythonProcessLauncher py(get_test_name(), @@ -1240,32 +1275,48 @@ namespace tut ensure_contains("string", str, "35"); } + struct PostendListener + { + PostendListener(LLProcess::ReadPipe& rpipe, + const std::string& pumpname, + const std::string& expect): + mReadPipe(rpipe), + mExpect(expect), + mTriggered(false) + { + LLEventPumps::instance().obtain(pumpname) + .listen("PostendListener", boost::bind(&PostendListener::postend, this, _1)); + } + + bool postend(const LLSD&) + { + mTriggered = true; + ensure_equals("postend listener", mReadPipe.read(mReadPipe.size()), mExpect); + return false; + } + + LLProcess::ReadPipe& mReadPipe; + std::string mExpect; + bool mTriggered; + }; + template<> template<> - void object::test<21>() + void object::test<23>() { - set_test_name("bad postend"); + set_test_name("all data visible at postend"); + PythonProcessLauncher py(get_test_name(), + "import sys\n" + // note, no '\n' in written data + "sys.stdout.write('partial line')\n"); std::string pumpname("postend"); - EventListener listener(LLEventPumps::instance().obtain(pumpname)); - LLProcess::Params params; - params.desc = get_test_name(); - params.postend = pumpname; - LLProcessPtr child = LLProcess::create(params); - ensure("shouldn't have launched", ! child); - ensure_equals("number of postend events", listener.mHistory.size(), 1); - LLSD postend(listener.mHistory.front()); - ensure("has id", ! postend.has("id")); - ensure_equals("desc", postend["desc"].asString(), std::string(params.desc)); - ensure_equals("state", postend["state"].asInteger(), LLProcess::UNSTARTED); - ensure("has data", ! postend.has("data")); - std::string error(postend["string"]); - // All we get from canned parameter validation is a bool, so the - // "validation failed" message we ourselves generate can't mention - // "executable" by name. Just check that it's nonempty. - //ensure_contains("error", error, "executable"); - ensure("string", ! error.empty()); + py.mParams.files.add(LLProcess::FileParam()); // stdin + py.mParams.files.add(LLProcess::FileParam("pipe")); // stdout + py.mParams.postend = pumpname; + py.launch(); + PostendListener listener(py.mPy->getReadPipe(LLProcess::STDOUT), + pumpname, + "partial line"); + waitfor(*py.mPy); + ensure("postend never triggered", listener.mTriggered); } - - // TODO: - // test EOF -- check logging - } // namespace tut |