/** * @file llstreamqueue.h * @author Nat Goodspeed * @date 2012-01-04 * @brief Definition of LLStreamQueue * * $LicenseInfo:firstyear=2012&license=viewerlgpl$ * Copyright (c) 2012, Linden Research, Inc. * $/LicenseInfo$ */ #if ! defined(LL_LLSTREAMQUEUE_H) #define LL_LLSTREAMQUEUE_H #include <string> #include <list> #include <iosfwd> // std::streamsize #include <boost/iostreams/categories.hpp> /** * This class is a growable buffer between a producer and consumer. It serves * as a queue usable with Boost.Iostreams -- hence, a "stream queue." * * This is especially useful for buffering nonblocking I/O. For instance, we * want application logic to be able to serialize LLSD to a std::ostream. We * may write more data than the destination pipe can handle all at once, but * it's imperative NOT to block the application-level serialization call. So * we buffer it instead. Successive frames can try nonblocking writes to the * destination pipe until all buffered data has been sent. * * Similarly, we want application logic be able to deserialize LLSD from a * std::istream. Again, we must not block that deserialize call waiting for * more data to arrive from the input pipe! Instead we build up a buffer over * a number of frames, using successive nonblocking reads, until we have * "enough" data to be able to present it through a std::istream. * * @note The use cases for this class overlap somewhat with those for the * LLIOPipe/LLPumpIO hierarchies, and indeed we considered using those. This * class has two virtues over the older machinery: * * # It's vastly simpler -- way fewer concepts. It's not clear to me whether * there were ever LLIOPipe/etc. use cases that demanded all the fanciness * rolled in, or whether they were simply overdesigned. In any case, no * remaining Lindens will admit to familiarity with those classes -- and * they're sufficiently obtuse that it would take considerable learning * curve to figure out how to use them properly. The bottom line is that * current management is not keen on any more engineers climbing that curve. * # This class is designed around available components such as std::string, * std::list, Boost.Iostreams. There's less proprietary code. */ template <typename Ch> class LLGenericStreamQueue { public: LLGenericStreamQueue(): mSize(0), mClosed(false) {} /** * Boost.Iostreams Source Device facade for use with other Boost.Iostreams * functionality. LLGenericStreamQueue doesn't quite fit any of the Boost * 1.48 Iostreams concepts; instead it behaves as both a Sink and a * Source. This is its Source facade. */ struct Source { typedef Ch char_type; typedef boost::iostreams::source_tag category; /// Bind the underlying LLGenericStreamQueue Source(LLGenericStreamQueue& sq): mStreamQueue(sq) {} // Read up to n characters from the underlying data source into the // buffer s, returning the number of characters read; return -1 to // indicate EOF std::streamsize read(Ch* s, std::streamsize n) { return mStreamQueue.read(s, n); } LLGenericStreamQueue& mStreamQueue; }; /** * Boost.Iostreams Sink Device facade for use with other Boost.Iostreams * functionality. LLGenericStreamQueue doesn't quite fit any of the Boost * 1.48 Iostreams concepts; instead it behaves as both a Sink and a * Source. This is its Sink facade. */ struct Sink { typedef Ch char_type; typedef boost::iostreams::sink_tag category; /// Bind the underlying LLGenericStreamQueue Sink(LLGenericStreamQueue& sq): mStreamQueue(sq) {} /// Write up to n characters from the buffer s to the output sequence, /// returning the number of characters written std::streamsize write(const Ch* s, std::streamsize n) { return mStreamQueue.write(s, n); } /// Send EOF to consumer void close() { mStreamQueue.close(); } LLGenericStreamQueue& mStreamQueue; }; /// Present Boost.Iostreams Source facade Source asSource() { return Source(*this); } /// Present Boost.Iostreams Sink facade Sink asSink() { return Sink(*this); } /// append data to buffer std::streamsize write(const Ch* s, std::streamsize n) { // Unclear how often we might be asked to write 0 bytes -- perhaps a // naive caller responding to an unready nonblocking read. But if we // do get such a call, don't add a completely empty BufferList entry. if (n == 0) return n; // We could implement this using a single std::string object, a la // ostringstream. But the trouble with appending to a string is that // you might have to recopy all previous contents to grow its size. If // we want this to scale to large data volumes, better to allocate // individual pieces. mBuffer.push_back(string(s, n)); mSize += n; return n; } /** * Inform this LLGenericStreamQueue that no further data are forthcoming. * For our purposes, close() is strictly a producer-side operation; * there's little point in closing the consumer side. */ void close() { mClosed = true; } /// consume data from buffer std::streamsize read(Ch* s, std::streamsize n) { // read() is actually a convenience method for peek() followed by // skip(). std::streamsize got(peek(s, n)); // We can only skip() as many characters as we can peek(); ignore // skip() return here. skip(n); return got; } /// Retrieve data from buffer without consuming. Like read(), return -1 on /// EOF. std::streamsize peek(Ch* s, std::streamsize n) const; /// Consume data from buffer without retrieving. Unlike read() and peek(), /// at EOF we simply skip 0 characters. std::streamsize skip(std::streamsize n); /// How many characters do we currently have buffered? std::streamsize size() const { return mSize; } private: typedef std::basic_string<Ch> string; typedef std::list<string> BufferList; BufferList mBuffer; std::streamsize mSize; bool mClosed; }; template <typename Ch> std::streamsize LLGenericStreamQueue<Ch>::peek(Ch* s, std::streamsize n) const { // Here we may have to build up 'n' characters from an arbitrary // number of individual BufferList entries. typename BufferList::const_iterator bli(mBuffer.begin()), blend(mBuffer.end()); // Indicate EOF if producer has closed the pipe AND we've exhausted // all previously-buffered data. if (mClosed && bli == blend) { return -1; } // Here either producer hasn't yet closed, or we haven't yet exhausted // remaining data. std::streamsize needed(n), got(0); // Loop until either we run out of BufferList entries or we've // completely satisfied the request. for ( ; bli != blend && needed; ++bli) { std::streamsize chunk(std::min(needed, std::streamsize(bli->length()))); std::copy(bli->begin(), bli->begin() + chunk, s); needed -= chunk; s += chunk; got += chunk; } return got; } template <typename Ch> std::streamsize LLGenericStreamQueue<Ch>::skip(std::streamsize n) { typename BufferList::iterator bli(mBuffer.begin()), blend(mBuffer.end()); std::streamsize toskip(n), skipped(0); while (bli != blend && toskip >= bli->length()) { std::streamsize chunk(bli->length()); typename BufferList::iterator zap(bli++); mBuffer.erase(zap); mSize -= chunk; toskip -= chunk; skipped += chunk; } if (bli != blend && toskip) { bli->erase(bli->begin(), bli->begin() + toskip); mSize -= toskip; skipped += toskip; } return skipped; } typedef LLGenericStreamQueue<char> LLStreamQueue; typedef LLGenericStreamQueue<wchar_t> LLWStreamQueue; #endif /* ! defined(LL_LLSTREAMQUEUE_H) */