summaryrefslogtreecommitdiff
path: root/indra/llplugin/llpluginmessagepipe.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'indra/llplugin/llpluginmessagepipe.cpp')
-rw-r--r--indra/llplugin/llpluginmessagepipe.cpp380
1 files changed, 380 insertions, 0 deletions
diff --git a/indra/llplugin/llpluginmessagepipe.cpp b/indra/llplugin/llpluginmessagepipe.cpp
new file mode 100644
index 0000000000..8d13e38ad5
--- /dev/null
+++ b/indra/llplugin/llpluginmessagepipe.cpp
@@ -0,0 +1,380 @@
+/**
+ * @file llpluginmessagepipe.cpp
+ * @brief Classes that implement connections from the plugin system to pipes/pumps.
+ *
+ * @cond
+ * $LicenseInfo:firstyear=2008&license=viewerlgpl$
+ * Second Life Viewer Source Code
+ * Copyright (C) 2010, Linden Research, Inc.
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation;
+ * version 2.1 of the License only.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this library; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
+ *
+ * Linden Research, Inc., 945 Battery Street, San Francisco, CA 94111 USA
+ * $/LicenseInfo$
+ * @endcond
+ */
+
+#include "linden_common.h"
+
+#include "llpluginmessagepipe.h"
+#include "llbufferstream.h"
+
+#include "llapr.h"
+
+static const char MESSAGE_DELIMITER = '\0';
+
+LLPluginMessagePipeOwner::LLPluginMessagePipeOwner() :
+ mMessagePipe(NULL),
+ mSocketError(APR_SUCCESS)
+{
+}
+
+// virtual
+LLPluginMessagePipeOwner::~LLPluginMessagePipeOwner()
+{
+ killMessagePipe();
+}
+
+// virtual
+apr_status_t LLPluginMessagePipeOwner::socketError(apr_status_t error)
+{
+ mSocketError = error;
+ return error;
+};
+
+//virtual
+void LLPluginMessagePipeOwner::setMessagePipe(LLPluginMessagePipe *read_pipe)
+{
+ // Save a reference to this pipe
+ mMessagePipe = read_pipe;
+}
+
+bool LLPluginMessagePipeOwner::canSendMessage(void)
+{
+ return (mMessagePipe != NULL);
+}
+
+bool LLPluginMessagePipeOwner::writeMessageRaw(const std::string &message)
+{
+ bool result = true;
+ if(mMessagePipe != NULL)
+ {
+ result = mMessagePipe->addMessage(message);
+ }
+ else
+ {
+ LL_WARNS("Plugin") << "dropping message: " << message << LL_ENDL;
+ result = false;
+ }
+
+ return result;
+}
+
+void LLPluginMessagePipeOwner::killMessagePipe(void)
+{
+ if(mMessagePipe != NULL)
+ {
+ delete mMessagePipe;
+ mMessagePipe = NULL;
+ }
+}
+
+LLPluginMessagePipe::LLPluginMessagePipe(LLPluginMessagePipeOwner *owner, LLSocket::ptr_t socket):
+ mInputMutex(gAPRPoolp),
+ mOutputMutex(gAPRPoolp),
+ mOwner(owner),
+ mSocket(socket)
+{
+
+ mOwner->setMessagePipe(this);
+}
+
+LLPluginMessagePipe::~LLPluginMessagePipe()
+{
+ if(mOwner != NULL)
+ {
+ mOwner->setMessagePipe(NULL);
+ }
+}
+
+bool LLPluginMessagePipe::addMessage(const std::string &message)
+{
+ // queue the message for later output
+ LLMutexLock lock(&mOutputMutex);
+ mOutput += message;
+ mOutput += MESSAGE_DELIMITER; // message separator
+
+ return true;
+}
+
+void LLPluginMessagePipe::clearOwner(void)
+{
+ // The owner is done with this pipe. The next call to process_impl should send any remaining data and exit.
+ mOwner = NULL;
+}
+
+void LLPluginMessagePipe::setSocketTimeout(apr_interval_time_t timeout_usec)
+{
+ // We never want to sleep forever, so force negative timeouts to become non-blocking.
+
+ // according to this page: http://dev.ariel-networks.com/apr/apr-tutorial/html/apr-tutorial-13.html
+ // blocking/non-blocking with apr sockets is somewhat non-portable.
+
+ if(timeout_usec <= 0)
+ {
+ // Make the socket non-blocking
+ apr_socket_opt_set(mSocket->getSocket(), APR_SO_NONBLOCK, 1);
+ apr_socket_timeout_set(mSocket->getSocket(), 0);
+ }
+ else
+ {
+ // Make the socket blocking-with-timeout
+ apr_socket_opt_set(mSocket->getSocket(), APR_SO_NONBLOCK, 1);
+ apr_socket_timeout_set(mSocket->getSocket(), timeout_usec);
+ }
+}
+
+bool LLPluginMessagePipe::pump(F64 timeout)
+{
+ bool result = pumpOutput();
+
+ if(result)
+ {
+ result = pumpInput(timeout);
+ }
+
+ return result;
+}
+
+bool LLPluginMessagePipe::pumpOutput()
+{
+ bool result = true;
+
+ if(mSocket)
+ {
+ apr_status_t status;
+ apr_size_t size;
+
+ LLMutexLock lock(&mOutputMutex);
+ if(!mOutput.empty())
+ {
+ // write any outgoing messages
+ size = (apr_size_t)mOutput.size();
+
+ setSocketTimeout(0);
+
+// LL_INFOS("Plugin") << "before apr_socket_send, size = " << size << LL_ENDL;
+
+ status = apr_socket_send(
+ mSocket->getSocket(),
+ (const char*)mOutput.data(),
+ &size);
+
+// LL_INFOS("Plugin") << "after apr_socket_send, size = " << size << LL_ENDL;
+
+ if(status == APR_SUCCESS)
+ {
+ // success
+ mOutput = mOutput.substr(size);
+ }
+ else if(APR_STATUS_IS_EAGAIN(status))
+ {
+ // Socket buffer is full...
+ // remove the written part from the buffer and try again later.
+ mOutput = mOutput.substr(size);
+ }
+ else if(APR_STATUS_IS_EOF(status))
+ {
+ // This is what we normally expect when a plugin exits.
+ llinfos << "Got EOF from plugin socket. " << llendl;
+
+ if(mOwner)
+ {
+ mOwner->socketError(status);
+ }
+ result = false;
+ }
+ else
+ {
+ // some other error
+ // Treat this as fatal.
+ ll_apr_warn_status(status);
+
+ if(mOwner)
+ {
+ mOwner->socketError(status);
+ }
+ result = false;
+ }
+ }
+ }
+
+ return result;
+}
+
+bool LLPluginMessagePipe::pumpInput(F64 timeout)
+{
+ bool result = true;
+
+ if(mSocket)
+ {
+ apr_status_t status;
+ apr_size_t size;
+
+ // FIXME: For some reason, the apr timeout stuff isn't working properly on windows.
+ // Until such time as we figure out why, don't try to use the socket timeout -- just sleep here instead.
+#if LL_WINDOWS
+ if(result)
+ {
+ if(timeout != 0.0f)
+ {
+ ms_sleep((int)(timeout * 1000.0f));
+ timeout = 0.0f;
+ }
+ }
+#endif
+
+ // Check for incoming messages
+ if(result)
+ {
+ char input_buf[1024];
+ apr_size_t request_size;
+
+ if(timeout == 0.0f)
+ {
+ // If we have no timeout, start out with a full read.
+ request_size = sizeof(input_buf);
+ }
+ else
+ {
+ // Start out by reading one byte, so that any data received will wake us up.
+ request_size = 1;
+ }
+
+ // and use the timeout so we'll sleep if no data is available.
+ setSocketTimeout((apr_interval_time_t)(timeout * 1000000));
+
+ while(1)
+ {
+ size = request_size;
+
+// LL_INFOS("Plugin") << "before apr_socket_recv, size = " << size << LL_ENDL;
+
+ status = apr_socket_recv(
+ mSocket->getSocket(),
+ input_buf,
+ &size);
+
+// LL_INFOS("Plugin") << "after apr_socket_recv, size = " << size << LL_ENDL;
+
+ if(size > 0)
+ {
+ LLMutexLock lock(&mInputMutex);
+ mInput.append(input_buf, size);
+ }
+
+ if(status == APR_SUCCESS)
+ {
+ LL_DEBUGS("PluginSocket") << "success, read " << size << LL_ENDL;
+
+ if(size != request_size)
+ {
+ // This was a short read, so we're done.
+ break;
+ }
+ }
+ else if(APR_STATUS_IS_TIMEUP(status))
+ {
+ LL_DEBUGS("PluginSocket") << "TIMEUP, read " << size << LL_ENDL;
+
+ // Timeout was hit. Since the initial read is 1 byte, this should never be a partial read.
+ break;
+ }
+ else if(APR_STATUS_IS_EAGAIN(status))
+ {
+ LL_DEBUGS("PluginSocket") << "EAGAIN, read " << size << LL_ENDL;
+
+ // Non-blocking read returned immediately.
+ break;
+ }
+ else if(APR_STATUS_IS_EOF(status))
+ {
+ // This is what we normally expect when a plugin exits.
+ LL_INFOS("PluginSocket") << "Got EOF from plugin socket. " << LL_ENDL;
+
+ if(mOwner)
+ {
+ mOwner->socketError(status);
+ }
+ result = false;
+ break;
+ }
+ else
+ {
+ // some other error
+ // Treat this as fatal.
+ ll_apr_warn_status(status);
+
+ if(mOwner)
+ {
+ mOwner->socketError(status);
+ }
+ result = false;
+ break;
+ }
+
+ if(timeout != 0.0f)
+ {
+ // Second and subsequent reads should not use the timeout
+ setSocketTimeout(0);
+ // and should try to fill the input buffer
+ request_size = sizeof(input_buf);
+ }
+ }
+
+ processInput();
+ }
+ }
+
+ return result;
+}
+
+void LLPluginMessagePipe::processInput(void)
+{
+ // Look for input delimiter(s) in the input buffer.
+ int delim;
+ mInputMutex.lock();
+ while((delim = mInput.find(MESSAGE_DELIMITER)) != std::string::npos)
+ {
+ // Let the owner process this message
+ if (mOwner)
+ {
+ // Pull the message out of the input buffer before calling receiveMessageRaw.
+ // It's now possible for this function to get called recursively (in the case where the plugin makes a blocking request)
+ // and this guarantees that the messages will get dequeued correctly.
+ std::string message(mInput, 0, delim);
+ mInput.erase(0, delim + 1);
+ mInputMutex.unlock();
+ mOwner->receiveMessageRaw(message);
+ mInputMutex.lock();
+ }
+ else
+ {
+ LL_WARNS("Plugin") << "!mOwner" << LL_ENDL;
+ }
+ }
+ mInputMutex.unlock();
+}
+