/** * @file llpluginmessagepipe.cpp * @brief Classes that implement connections from the plugin system to pipes/pumps. * * @cond * $LicenseInfo:firstyear=2008&license=viewerlgpl$ * Second Life Viewer Source Code * Copyright (C) 2010, Linden Research, Inc. * * This library is free software; you can redistribute it and/or * modify it under the terms of the GNU Lesser General Public * License as published by the Free Software Foundation; * version 2.1 of the License only. * * This library is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU * Lesser General Public License for more details. * * You should have received a copy of the GNU Lesser General Public * License along with this library; if not, write to the Free Software * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA * * Linden Research, Inc., 945 Battery Street, San Francisco, CA 94111 USA * $/LicenseInfo$ * @endcond */ #include "linden_common.h" #include "llpluginmessagepipe.h" #include "llbufferstream.h" #include "llapr.h" static const char MESSAGE_DELIMITER = '\0'; LLPluginMessagePipeOwner::LLPluginMessagePipeOwner() : mMessagePipe(NULL), mSocketError(APR_SUCCESS) { } // virtual LLPluginMessagePipeOwner::~LLPluginMessagePipeOwner() { killMessagePipe(); } // virtual apr_status_t LLPluginMessagePipeOwner::socketError(apr_status_t error) { mSocketError = error; return error; }; //virtual void LLPluginMessagePipeOwner::setMessagePipe(LLPluginMessagePipe *read_pipe) { // Save a reference to this pipe mMessagePipe = read_pipe; } bool LLPluginMessagePipeOwner::canSendMessage(void) { return (mMessagePipe != NULL); } bool LLPluginMessagePipeOwner::writeMessageRaw(const std::string &message) { bool result = true; if(mMessagePipe != NULL) { result = mMessagePipe->addMessage(message); } else { LL_WARNS("Plugin") << "dropping message: " << message << LL_ENDL; result = false; } return result; } void LLPluginMessagePipeOwner::killMessagePipe(void) { if(mMessagePipe != NULL) { delete mMessagePipe; mMessagePipe = NULL; } } LLPluginMessagePipe::LLPluginMessagePipe(LLPluginMessagePipeOwner *owner, LLSocket::ptr_t socket): mInputMutex(), mOutputMutex(), mOutputStartIndex(0), mOwner(owner), mSocket(socket) { mOwner->setMessagePipe(this); } LLPluginMessagePipe::~LLPluginMessagePipe() { if(mOwner != NULL) { mOwner->setMessagePipe(NULL); } } bool LLPluginMessagePipe::addMessage(const std::string &message) { // queue the message for later output LLMutexLock lock(&mOutputMutex); // If we're starting to use up too much memory, clear if (mOutputStartIndex > 1024 * 1024) { mOutput = mOutput.substr(mOutputStartIndex); mOutputStartIndex = 0; } mOutput += message; mOutput += MESSAGE_DELIMITER; // message separator return true; } void LLPluginMessagePipe::clearOwner(void) { // The owner is done with this pipe. The next call to process_impl should send any remaining data and exit. mOwner = NULL; } void LLPluginMessagePipe::setSocketTimeout(apr_interval_time_t timeout_usec) { // We never want to sleep forever, so force negative timeouts to become non-blocking. // according to this page: http://dev.ariel-networks.com/apr/apr-tutorial/html/apr-tutorial-13.html // blocking/non-blocking with apr sockets is somewhat non-portable. if(timeout_usec <= 0) { // Make the socket non-blocking apr_socket_opt_set(mSocket->getSocket(), APR_SO_NONBLOCK, 1); apr_socket_timeout_set(mSocket->getSocket(), 0); } else { // Make the socket blocking-with-timeout apr_socket_opt_set(mSocket->getSocket(), APR_SO_NONBLOCK, 1); apr_socket_timeout_set(mSocket->getSocket(), timeout_usec); } } bool LLPluginMessagePipe::pump(F64 timeout) { bool result = pumpOutput(); if(result) { result = pumpInput(timeout); } return result; } bool LLPluginMessagePipe::pumpOutput() { bool result = true; if(mSocket) { apr_status_t status; apr_size_t in_size, out_size; LLMutexLock lock(&mOutputMutex); const char * output_data = &(mOutput.data()[mOutputStartIndex]); if(*output_data != '\0') { // write any outgoing messages in_size = (apr_size_t) (mOutput.size() - mOutputStartIndex); out_size = in_size; setSocketTimeout(0); // LL_INFOS("Plugin") << "before apr_socket_send, size = " << size << LL_ENDL; status = apr_socket_send(mSocket->getSocket(), output_data, &out_size); // LL_INFOS("Plugin") << "after apr_socket_send, size = " << size << LL_ENDL; if((status == APR_SUCCESS) || APR_STATUS_IS_EAGAIN(status)) { // Success or Socket buffer is full... // If we've pumped the entire string, clear it if (out_size == in_size) { mOutputStartIndex = 0; mOutput.clear(); } else { llassert(in_size > out_size); // Remove the written part from the buffer and try again later. mOutputStartIndex += out_size; } } else if(APR_STATUS_IS_EOF(status)) { // This is what we normally expect when a plugin exits. //LL_INFOS() << "Got EOF from plugin socket. " << LL_ENDL; if(mOwner) { mOwner->socketError(status); } result = false; } else { // some other error // Treat this as fatal. ll_apr_warn_status(status); if(mOwner) { mOwner->socketError(status); } result = false; } } } return result; } bool LLPluginMessagePipe::pumpInput(F64 timeout) { bool result = true; if(mSocket) { apr_status_t status; apr_size_t size; // FIXME: For some reason, the apr timeout stuff isn't working properly on windows. // Until such time as we figure out why, don't try to use the socket timeout -- just sleep here instead. #if LL_WINDOWS if(result) { if(timeout != 0.0f) { ms_sleep((int)(timeout * 1000.0f)); timeout = 0.0f; } } #endif // Check for incoming messages if(result) { char input_buf[1024]; apr_size_t request_size; if(timeout == 0.0f) { // If we have no timeout, start out with a full read. request_size = sizeof(input_buf); } else { // Start out by reading one byte, so that any data received will wake us up. request_size = 1; } // and use the timeout so we'll sleep if no data is available. setSocketTimeout((apr_interval_time_t)(timeout * 1000000)); while(1) { size = request_size; // LL_INFOS("Plugin") << "before apr_socket_recv, size = " << size << LL_ENDL; status = apr_socket_recv( mSocket->getSocket(), input_buf, &size); // LL_INFOS("Plugin") << "after apr_socket_recv, size = " << size << LL_ENDL; if(size > 0) { LLMutexLock lock(&mInputMutex); mInput.append(input_buf, size); } if(status == APR_SUCCESS) { LL_DEBUGS("PluginSocket") << "success, read " << size << LL_ENDL; if(size != request_size) { // This was a short read, so we're done. break; } } else if(APR_STATUS_IS_TIMEUP(status)) { LL_DEBUGS("PluginSocket") << "TIMEUP, read " << size << LL_ENDL; // Timeout was hit. Since the initial read is 1 byte, this should never be a partial read. break; } else if(APR_STATUS_IS_EAGAIN(status)) { LL_DEBUGS("PluginSocket") << "EAGAIN, read " << size << LL_ENDL; // Non-blocking read returned immediately. break; } else if(APR_STATUS_IS_EOF(status)) { // This is what we normally expect when a plugin exits. //LL_INFOS("PluginSocket") << "Got EOF from plugin socket. " << LL_ENDL; if(mOwner) { mOwner->socketError(status); } result = false; break; } else { // some other error // Treat this as fatal. ll_apr_warn_status(status); if(mOwner) { mOwner->socketError(status); } result = false; break; } if(timeout != 0.0f) { // Second and subsequent reads should not use the timeout setSocketTimeout(0); // and should try to fill the input buffer request_size = sizeof(input_buf); } } processInput(); } } return result; } void LLPluginMessagePipe::processInput(void) { // Look for input delimiter(s) in the input buffer. size_t delim; mInputMutex.lock(); while((delim = mInput.find(MESSAGE_DELIMITER)) != std::string::npos) { // Let the owner process this message if (mOwner) { // Pull the message out of the input buffer before calling receiveMessageRaw. // It's now possible for this function to get called recursively (in the case where the plugin makes a blocking request) // and this guarantees that the messages will get dequeued correctly. std::string message(mInput, 0, delim); mInput.erase(0, delim + 1); mInputMutex.unlock(); mOwner->receiveMessageRaw(message); mInputMutex.lock(); } else { LL_WARNS("Plugin") << "!mOwner" << LL_ENDL; } } mInputMutex.unlock(); }