summaryrefslogtreecommitdiff
path: root/indra/llcommon/llstreamqueue.h
diff options
context:
space:
mode:
Diffstat (limited to 'indra/llcommon/llstreamqueue.h')
-rw-r--r--indra/llcommon/llstreamqueue.h240
1 files changed, 240 insertions, 0 deletions
diff --git a/indra/llcommon/llstreamqueue.h b/indra/llcommon/llstreamqueue.h
new file mode 100644
index 0000000000..0726bad175
--- /dev/null
+++ b/indra/llcommon/llstreamqueue.h
@@ -0,0 +1,240 @@
+/**
+ * @file llstreamqueue.h
+ * @author Nat Goodspeed
+ * @date 2012-01-04
+ * @brief Definition of LLStreamQueue
+ *
+ * $LicenseInfo:firstyear=2012&license=viewerlgpl$
+ * Copyright (c) 2012, Linden Research, Inc.
+ * $/LicenseInfo$
+ */
+
+#if ! defined(LL_LLSTREAMQUEUE_H)
+#define LL_LLSTREAMQUEUE_H
+
+#include <string>
+#include <list>
+#include <iosfwd> // std::streamsize
+#include <boost/iostreams/categories.hpp>
+
+/**
+ * This class is a growable buffer between a producer and consumer. It serves
+ * as a queue usable with Boost.Iostreams -- hence, a "stream queue."
+ *
+ * This is especially useful for buffering nonblocking I/O. For instance, we
+ * want application logic to be able to serialize LLSD to a std::ostream. We
+ * may write more data than the destination pipe can handle all at once, but
+ * it's imperative NOT to block the application-level serialization call. So
+ * we buffer it instead. Successive frames can try nonblocking writes to the
+ * destination pipe until all buffered data has been sent.
+ *
+ * Similarly, we want application logic be able to deserialize LLSD from a
+ * std::istream. Again, we must not block that deserialize call waiting for
+ * more data to arrive from the input pipe! Instead we build up a buffer over
+ * a number of frames, using successive nonblocking reads, until we have
+ * "enough" data to be able to present it through a std::istream.
+ *
+ * @note The use cases for this class overlap somewhat with those for the
+ * LLIOPipe/LLPumpIO hierarchies, and indeed we considered using those. This
+ * class has two virtues over the older machinery:
+ *
+ * # It's vastly simpler -- way fewer concepts. It's not clear to me whether
+ * there were ever LLIOPipe/etc. use cases that demanded all the fanciness
+ * rolled in, or whether they were simply overdesigned. In any case, no
+ * remaining Lindens will admit to familiarity with those classes -- and
+ * they're sufficiently obtuse that it would take considerable learning
+ * curve to figure out how to use them properly. The bottom line is that
+ * current management is not keen on any more engineers climbing that curve.
+ * # This class is designed around available components such as std::string,
+ * std::list, Boost.Iostreams. There's less proprietary code.
+ */
+template <typename Ch>
+class LLGenericStreamQueue
+{
+public:
+ LLGenericStreamQueue():
+ mSize(0),
+ mClosed(false)
+ {}
+
+ /**
+ * Boost.Iostreams Source Device facade for use with other Boost.Iostreams
+ * functionality. LLGenericStreamQueue doesn't quite fit any of the Boost
+ * 1.48 Iostreams concepts; instead it behaves as both a Sink and a
+ * Source. This is its Source facade.
+ */
+ struct Source
+ {
+ typedef Ch char_type;
+ typedef boost::iostreams::source_tag category;
+
+ /// Bind the underlying LLGenericStreamQueue
+ Source(LLGenericStreamQueue& sq):
+ mStreamQueue(sq)
+ {}
+
+ // Read up to n characters from the underlying data source into the
+ // buffer s, returning the number of characters read; return -1 to
+ // indicate EOF
+ std::streamsize read(Ch* s, std::streamsize n)
+ {
+ return mStreamQueue.read(s, n);
+ }
+
+ LLGenericStreamQueue& mStreamQueue;
+ };
+
+ /**
+ * Boost.Iostreams Sink Device facade for use with other Boost.Iostreams
+ * functionality. LLGenericStreamQueue doesn't quite fit any of the Boost
+ * 1.48 Iostreams concepts; instead it behaves as both a Sink and a
+ * Source. This is its Sink facade.
+ */
+ struct Sink
+ {
+ typedef Ch char_type;
+ typedef boost::iostreams::sink_tag category;
+
+ /// Bind the underlying LLGenericStreamQueue
+ Sink(LLGenericStreamQueue& sq):
+ mStreamQueue(sq)
+ {}
+
+ /// Write up to n characters from the buffer s to the output sequence,
+ /// returning the number of characters written
+ std::streamsize write(const Ch* s, std::streamsize n)
+ {
+ return mStreamQueue.write(s, n);
+ }
+
+ /// Send EOF to consumer
+ void close()
+ {
+ mStreamQueue.close();
+ }
+
+ LLGenericStreamQueue& mStreamQueue;
+ };
+
+ /// Present Boost.Iostreams Source facade
+ Source asSource() { return Source(*this); }
+ /// Present Boost.Iostreams Sink facade
+ Sink asSink() { return Sink(*this); }
+
+ /// append data to buffer
+ std::streamsize write(const Ch* s, std::streamsize n)
+ {
+ // Unclear how often we might be asked to write 0 bytes -- perhaps a
+ // naive caller responding to an unready nonblocking read. But if we
+ // do get such a call, don't add a completely empty BufferList entry.
+ if (n == 0)
+ return n;
+ // We could implement this using a single std::string object, a la
+ // ostringstream. But the trouble with appending to a string is that
+ // you might have to recopy all previous contents to grow its size. If
+ // we want this to scale to large data volumes, better to allocate
+ // individual pieces.
+ mBuffer.push_back(string(s, n));
+ mSize += n;
+ return n;
+ }
+
+ /**
+ * Inform this LLGenericStreamQueue that no further data are forthcoming.
+ * For our purposes, close() is strictly a producer-side operation;
+ * there's little point in closing the consumer side.
+ */
+ void close()
+ {
+ mClosed = true;
+ }
+
+ /// consume data from buffer
+ std::streamsize read(Ch* s, std::streamsize n)
+ {
+ // read() is actually a convenience method for peek() followed by
+ // skip().
+ std::streamsize got(peek(s, n));
+ // We can only skip() as many characters as we can peek(); ignore
+ // skip() return here.
+ skip(n);
+ return got;
+ }
+
+ /// Retrieve data from buffer without consuming. Like read(), return -1 on
+ /// EOF.
+ std::streamsize peek(Ch* s, std::streamsize n) const;
+
+ /// Consume data from buffer without retrieving. Unlike read() and peek(),
+ /// at EOF we simply skip 0 characters.
+ std::streamsize skip(std::streamsize n);
+
+ /// How many characters do we currently have buffered?
+ std::streamsize size() const
+ {
+ return mSize;
+ }
+
+private:
+ typedef std::basic_string<Ch> string;
+ typedef std::list<string> BufferList;
+ BufferList mBuffer;
+ std::streamsize mSize;
+ bool mClosed;
+};
+
+template <typename Ch>
+std::streamsize LLGenericStreamQueue<Ch>::peek(Ch* s, std::streamsize n) const
+{
+ // Here we may have to build up 'n' characters from an arbitrary
+ // number of individual BufferList entries.
+ typename BufferList::const_iterator bli(mBuffer.begin()), blend(mBuffer.end());
+ // Indicate EOF if producer has closed the pipe AND we've exhausted
+ // all previously-buffered data.
+ if (mClosed && bli == blend)
+ {
+ return -1;
+ }
+ // Here either producer hasn't yet closed, or we haven't yet exhausted
+ // remaining data.
+ std::streamsize needed(n), got(0);
+ // Loop until either we run out of BufferList entries or we've
+ // completely satisfied the request.
+ for ( ; bli != blend && needed; ++bli)
+ {
+ std::streamsize chunk(std::min(needed, std::streamsize(bli->length())));
+ std::copy(bli->begin(), bli->begin() + chunk, s);
+ needed -= chunk;
+ s += chunk;
+ got += chunk;
+ }
+ return got;
+}
+
+template <typename Ch>
+std::streamsize LLGenericStreamQueue<Ch>::skip(std::streamsize n)
+{
+ typename BufferList::iterator bli(mBuffer.begin()), blend(mBuffer.end());
+ std::streamsize toskip(n), skipped(0);
+ while (bli != blend && toskip >= bli->length())
+ {
+ std::streamsize chunk(bli->length());
+ typename BufferList::iterator zap(bli++);
+ mBuffer.erase(zap);
+ mSize -= chunk;
+ toskip -= chunk;
+ skipped += chunk;
+ }
+ if (bli != blend && toskip)
+ {
+ bli->erase(bli->begin(), bli->begin() + toskip);
+ mSize -= toskip;
+ skipped += toskip;
+ }
+ return skipped;
+}
+
+typedef LLGenericStreamQueue<char> LLStreamQueue;
+typedef LLGenericStreamQueue<wchar_t> LLWStreamQueue;
+
+#endif /* ! defined(LL_LLSTREAMQUEUE_H) */