summaryrefslogtreecommitdiff
path: root/indra/llplugin
diff options
context:
space:
mode:
Diffstat (limited to 'indra/llplugin')
-rw-r--r--indra/llplugin/llpluginmessagepipe.cpp104
-rw-r--r--indra/llplugin/llpluginmessagepipe.h9
-rw-r--r--indra/llplugin/llpluginprocesschild.cpp9
-rw-r--r--indra/llplugin/llpluginprocessparent.cpp426
-rw-r--r--indra/llplugin/llpluginprocessparent.h26
5 files changed, 523 insertions, 51 deletions
diff --git a/indra/llplugin/llpluginmessagepipe.cpp b/indra/llplugin/llpluginmessagepipe.cpp
index e524c88cf8..89f8b44569 100644
--- a/indra/llplugin/llpluginmessagepipe.cpp
+++ b/indra/llplugin/llpluginmessagepipe.cpp
@@ -96,11 +96,14 @@ void LLPluginMessagePipeOwner::killMessagePipe(void)
}
}
-LLPluginMessagePipe::LLPluginMessagePipe(LLPluginMessagePipeOwner *owner, LLSocket::ptr_t socket)
+LLPluginMessagePipe::LLPluginMessagePipe(LLPluginMessagePipeOwner *owner, LLSocket::ptr_t socket):
+ mInputMutex(gAPRPoolp),
+ mOutputMutex(gAPRPoolp),
+ mOwner(owner),
+ mSocket(socket)
{
- mOwner = owner;
+
mOwner->setMessagePipe(this);
- mSocket = socket;
}
LLPluginMessagePipe::~LLPluginMessagePipe()
@@ -114,6 +117,7 @@ LLPluginMessagePipe::~LLPluginMessagePipe()
bool LLPluginMessagePipe::addMessage(const std::string &message)
{
// queue the message for later output
+ LLMutexLock lock(&mOutputMutex);
mOutput += message;
mOutput += MESSAGE_DELIMITER; // message separator
@@ -149,6 +153,18 @@ void LLPluginMessagePipe::setSocketTimeout(apr_interval_time_t timeout_usec)
bool LLPluginMessagePipe::pump(F64 timeout)
{
+ bool result = pumpOutput();
+
+ if(result)
+ {
+ result = pumpInput(timeout);
+ }
+
+ return result;
+}
+
+bool LLPluginMessagePipe::pumpOutput()
+{
bool result = true;
if(mSocket)
@@ -156,6 +172,7 @@ bool LLPluginMessagePipe::pump(F64 timeout)
apr_status_t status;
apr_size_t size;
+ LLMutexLock lock(&mOutputMutex);
if(!mOutput.empty())
{
// write any outgoing messages
@@ -183,6 +200,17 @@ bool LLPluginMessagePipe::pump(F64 timeout)
// remove the written part from the buffer and try again later.
mOutput = mOutput.substr(size);
}
+ else if(APR_STATUS_IS_EOF(status))
+ {
+ // This is what we normally expect when a plugin exits.
+ llinfos << "Got EOF from plugin socket. " << llendl;
+
+ if(mOwner)
+ {
+ mOwner->socketError(status);
+ }
+ result = false;
+ }
else
{
// some other error
@@ -196,6 +224,19 @@ bool LLPluginMessagePipe::pump(F64 timeout)
result = false;
}
}
+ }
+
+ return result;
+}
+
+bool LLPluginMessagePipe::pumpInput(F64 timeout)
+{
+ bool result = true;
+
+ if(mSocket)
+ {
+ apr_status_t status;
+ apr_size_t size;
// FIXME: For some reason, the apr timeout stuff isn't working properly on windows.
// Until such time as we figure out why, don't try to use the socket timeout -- just sleep here instead.
@@ -216,8 +257,16 @@ bool LLPluginMessagePipe::pump(F64 timeout)
char input_buf[1024];
apr_size_t request_size;
- // Start out by reading one byte, so that any data received will wake us up.
- request_size = 1;
+ if(timeout == 0.0f)
+ {
+ // If we have no timeout, start out with a full read.
+ request_size = sizeof(input_buf);
+ }
+ else
+ {
+ // Start out by reading one byte, so that any data received will wake us up.
+ request_size = 1;
+ }
// and use the timeout so we'll sleep if no data is available.
setSocketTimeout((apr_interval_time_t)(timeout * 1000000));
@@ -236,11 +285,14 @@ bool LLPluginMessagePipe::pump(F64 timeout)
// LL_INFOS("Plugin") << "after apr_socket_recv, size = " << size << LL_ENDL;
if(size > 0)
+ {
+ LLMutexLock lock(&mInputMutex);
mInput.append(input_buf, size);
+ }
if(status == APR_SUCCESS)
{
-// llinfos << "success, read " << size << llendl;
+ LL_DEBUGS("PluginSocket") << "success, read " << size << LL_ENDL;
if(size != request_size)
{
@@ -250,16 +302,28 @@ bool LLPluginMessagePipe::pump(F64 timeout)
}
else if(APR_STATUS_IS_TIMEUP(status))
{
-// llinfos << "TIMEUP, read " << size << llendl;
+ LL_DEBUGS("PluginSocket") << "TIMEUP, read " << size << LL_ENDL;
// Timeout was hit. Since the initial read is 1 byte, this should never be a partial read.
break;
}
else if(APR_STATUS_IS_EAGAIN(status))
{
-// llinfos << "EAGAIN, read " << size << llendl;
+ LL_DEBUGS("PluginSocket") << "EAGAIN, read " << size << LL_ENDL;
- // We've been doing partial reads, and we're done now.
+ // Non-blocking read returned immediately.
+ break;
+ }
+ else if(APR_STATUS_IS_EOF(status))
+ {
+ // This is what we normally expect when a plugin exits.
+ LL_INFOS("PluginSocket") << "Got EOF from plugin socket. " << LL_ENDL;
+
+ if(mOwner)
+ {
+ mOwner->socketError(status);
+ }
+ result = false;
break;
}
else
@@ -276,22 +340,18 @@ bool LLPluginMessagePipe::pump(F64 timeout)
break;
}
- // Second and subsequent reads should not use the timeout
- setSocketTimeout(0);
- // and should try to fill the input buffer
- request_size = sizeof(input_buf);
+ if(timeout != 0.0f)
+ {
+ // Second and subsequent reads should not use the timeout
+ setSocketTimeout(0);
+ // and should try to fill the input buffer
+ request_size = sizeof(input_buf);
+ }
}
processInput();
}
}
-
- if(!result)
- {
- // If we got an error, we're done.
- LL_INFOS("Plugin") << "Error from socket, cleaning up." << LL_ENDL;
- delete this;
- }
return result;
}
@@ -300,6 +360,7 @@ void LLPluginMessagePipe::processInput(void)
{
// Look for input delimiter(s) in the input buffer.
int delim;
+ mInputMutex.lock();
while((delim = mInput.find(MESSAGE_DELIMITER)) != std::string::npos)
{
// Let the owner process this message
@@ -310,12 +371,15 @@ void LLPluginMessagePipe::processInput(void)
// and this guarantees that the messages will get dequeued correctly.
std::string message(mInput, 0, delim);
mInput.erase(0, delim + 1);
+ mInputMutex.unlock();
mOwner->receiveMessageRaw(message);
+ mInputMutex.lock();
}
else
{
LL_WARNS("Plugin") << "!mOwner" << LL_ENDL;
}
}
+ mInputMutex.unlock();
}
diff --git a/indra/llplugin/llpluginmessagepipe.h b/indra/llplugin/llpluginmessagepipe.h
index 1ddb38de68..1b0a08254b 100644
--- a/indra/llplugin/llpluginmessagepipe.h
+++ b/indra/llplugin/llpluginmessagepipe.h
@@ -35,6 +35,7 @@
#define LL_LLPLUGINMESSAGEPIPE_H
#include "lliosocket.h"
+#include "llthread.h"
class LLPluginMessagePipe;
@@ -51,7 +52,7 @@ public:
virtual apr_status_t socketError(apr_status_t error);
// called from LLPluginMessagePipe to manage the connection with LLPluginMessagePipeOwner -- do not use!
- virtual void setMessagePipe(LLPluginMessagePipe *message_pipe) ;
+ virtual void setMessagePipe(LLPluginMessagePipe *message_pipe);
protected:
// returns false if writeMessageRaw() would drop the message
@@ -76,14 +77,18 @@ public:
void clearOwner(void);
bool pump(F64 timeout = 0.0f);
-
+ bool pumpOutput();
+ bool pumpInput(F64 timeout = 0.0f);
+
protected:
void processInput(void);
// used internally by pump()
void setSocketTimeout(apr_interval_time_t timeout_usec);
+ LLMutex mInputMutex;
std::string mInput;
+ LLMutex mOutputMutex;
std::string mOutput;
LLPluginMessagePipeOwner *mOwner;
diff --git a/indra/llplugin/llpluginprocesschild.cpp b/indra/llplugin/llpluginprocesschild.cpp
index 2d078cd6ed..d1cf91b253 100644
--- a/indra/llplugin/llpluginprocesschild.cpp
+++ b/indra/llplugin/llpluginprocesschild.cpp
@@ -85,9 +85,14 @@ void LLPluginProcessChild::idle(void)
bool idle_again;
do
{
- if(mSocketError != APR_SUCCESS)
+ if(APR_STATUS_IS_EOF(mSocketError))
{
- LL_INFOS("Plugin") << "message pipe is in error state, moving to STATE_ERROR"<< LL_ENDL;
+ // Plugin socket was closed. This covers both normal plugin termination and host crashes.
+ setState(STATE_ERROR);
+ }
+ else if(mSocketError != APR_SUCCESS)
+ {
+ LL_INFOS("Plugin") << "message pipe is in error state (" << mSocketError << "), moving to STATE_ERROR"<< LL_ENDL;
setState(STATE_ERROR);
}
diff --git a/indra/llplugin/llpluginprocessparent.cpp b/indra/llplugin/llpluginprocessparent.cpp
index e273410a1d..3589b22a77 100644
--- a/indra/llplugin/llpluginprocessparent.cpp
+++ b/indra/llplugin/llpluginprocessparent.cpp
@@ -45,8 +45,51 @@ LLPluginProcessParentOwner::~LLPluginProcessParentOwner()
}
-LLPluginProcessParent::LLPluginProcessParent(LLPluginProcessParentOwner *owner)
+bool LLPluginProcessParent::sUseReadThread = false;
+apr_pollset_t *LLPluginProcessParent::sPollSet = NULL;
+bool LLPluginProcessParent::sPollsetNeedsRebuild = false;
+LLMutex *LLPluginProcessParent::sInstancesMutex;
+std::list<LLPluginProcessParent*> LLPluginProcessParent::sInstances;
+LLThread *LLPluginProcessParent::sReadThread = NULL;
+
+
+class LLPluginProcessParentPollThread: public LLThread
{
+public:
+ LLPluginProcessParentPollThread() :
+ LLThread("LLPluginProcessParentPollThread", gAPRPoolp)
+ {
+ }
+protected:
+ // Inherited from LLThread
+ /*virtual*/ void run(void)
+ {
+ while(!isQuitting() && LLPluginProcessParent::getUseReadThread())
+ {
+ LLPluginProcessParent::poll(0.1f);
+ checkPause();
+ }
+
+ // Final poll to clean up the pollset, etc.
+ LLPluginProcessParent::poll(0.0f);
+ }
+
+ // Inherited from LLThread
+ /*virtual*/ bool runCondition(void)
+ {
+ return(LLPluginProcessParent::canPollThreadRun());
+ }
+
+};
+
+LLPluginProcessParent::LLPluginProcessParent(LLPluginProcessParentOwner *owner):
+ mIncomingQueueMutex(gAPRPoolp)
+{
+ if(!sInstancesMutex)
+ {
+ sInstancesMutex = new LLMutex(gAPRPoolp);
+ }
+
mOwner = owner;
mBoundPort = 0;
mState = STATE_UNINITIALIZED;
@@ -55,18 +98,36 @@ LLPluginProcessParent::LLPluginProcessParent(LLPluginProcessParentOwner *owner)
mDisableTimeout = false;
mDebug = false;
mBlocked = false;
+ mPolledInput = false;
+ mPollFD.client_data = NULL;
mPluginLaunchTimeout = 60.0f;
mPluginLockupTimeout = 15.0f;
// Don't start the timer here -- start it when we actually launch the plugin process.
mHeartbeat.stop();
+
+ // Don't add to the global list until fully constructed.
+ {
+ LLMutexLock lock(sInstancesMutex);
+ sInstances.push_back(this);
+ }
}
LLPluginProcessParent::~LLPluginProcessParent()
{
LL_DEBUGS("Plugin") << "destructor" << LL_ENDL;
+ // Remove from the global list before beginning destruction.
+ {
+ // Make sure to get the global mutex _first_ here, to avoid a possible deadlock against LLPluginProcessParent::poll()
+ LLMutexLock lock(sInstancesMutex);
+ {
+ LLMutexLock lock2(&mIncomingQueueMutex);
+ sInstances.remove(this);
+ }
+ }
+
// Destroy any remaining shared memory regions
sharedMemoryRegionsType::iterator iter;
while((iter = mSharedMemoryRegions.begin()) != mSharedMemoryRegions.end())
@@ -78,15 +139,17 @@ LLPluginProcessParent::~LLPluginProcessParent()
mSharedMemoryRegions.erase(iter);
}
- // orphaning the process means it won't be killed when the LLProcessLauncher is destructed.
- // This is what we want -- it should exit cleanly once it notices the sockets have been closed.
- mProcess.orphan();
+ mProcess.kill();
killSockets();
}
void LLPluginProcessParent::killSockets(void)
{
- killMessagePipe();
+ {
+ LLMutexLock lock(&mIncomingQueueMutex);
+ killMessagePipe();
+ }
+
mListenSocket.reset();
mSocket.reset();
}
@@ -160,21 +223,47 @@ void LLPluginProcessParent::idle(void)
do
{
+ // process queued messages
+ mIncomingQueueMutex.lock();
+ while(!mIncomingQueue.empty())
+ {
+ LLPluginMessage message = mIncomingQueue.front();
+ mIncomingQueue.pop();
+ mIncomingQueueMutex.unlock();
+
+ receiveMessage(message);
+
+ mIncomingQueueMutex.lock();
+ }
+
+ mIncomingQueueMutex.unlock();
+
// Give time to network processing
if(mMessagePipe)
{
- if(!mMessagePipe->pump())
+ // Drain any queued outgoing messages
+ mMessagePipe->pumpOutput();
+
+ // Only do input processing here if this instance isn't in a pollset.
+ if(!mPolledInput)
{
-// LL_WARNS("Plugin") << "Message pipe hit an error state" << LL_ENDL;
- errorState();
+ mMessagePipe->pumpInput();
}
}
-
- if((mSocketError != APR_SUCCESS) && (mState <= STATE_RUNNING))
+
+ if(mState <= STATE_RUNNING)
{
- // The socket is in an error state -- the plugin is gone.
- LL_WARNS("Plugin") << "Socket hit an error state (" << mSocketError << ")" << LL_ENDL;
- errorState();
+ if(APR_STATUS_IS_EOF(mSocketError))
+ {
+ // Plugin socket was closed. This covers both normal plugin termination and plugin crashes.
+ errorState();
+ }
+ else if(mSocketError != APR_SUCCESS)
+ {
+ // The socket is in an error state -- the plugin is gone.
+ LL_WARNS("Plugin") << "Socket hit an error state (" << mSocketError << ")" << LL_ENDL;
+ errorState();
+ }
}
// If a state needs to go directly to another state (as a performance enhancement), it can set idle_again to true after calling setState().
@@ -355,7 +444,7 @@ void LLPluginProcessParent::idle(void)
break;
case STATE_HELLO:
- LL_DEBUGS("Plugin") << "received hello message" << llendl;
+ LL_DEBUGS("Plugin") << "received hello message" << LL_ENDL;
// Send the message to load the plugin
{
@@ -389,7 +478,7 @@ void LLPluginProcessParent::idle(void)
}
else if(pluginLockedUp())
{
- LL_WARNS("Plugin") << "timeout in exiting state, bailing out" << llendl;
+ LL_WARNS("Plugin") << "timeout in exiting state, bailing out" << LL_ENDL;
errorState();
}
break;
@@ -411,8 +500,7 @@ void LLPluginProcessParent::idle(void)
break;
case STATE_CLEANUP:
- // Don't do a kill here anymore -- closing the sockets is the new 'kill'.
- mProcess.orphan();
+ mProcess.kill();
killSockets();
setState(STATE_DONE);
break;
@@ -491,29 +579,317 @@ void LLPluginProcessParent::sendMessage(const LLPluginMessage &message)
std::string buffer = message.generate();
LL_DEBUGS("Plugin") << "Sending: " << buffer << LL_ENDL;
writeMessageRaw(buffer);
+
+ // Try to send message immediately.
+ if(mMessagePipe)
+ {
+ mMessagePipe->pumpOutput();
+ }
+}
+
+//virtual
+void LLPluginProcessParent::setMessagePipe(LLPluginMessagePipe *message_pipe)
+{
+ bool update_pollset = false;
+
+ if(mMessagePipe)
+ {
+ // Unsetting an existing message pipe -- remove from the pollset
+ mPollFD.client_data = NULL;
+
+ // pollset needs an update
+ update_pollset = true;
+ }
+ if(message_pipe != NULL)
+ {
+ // Set up the apr_pollfd_t
+ mPollFD.p = gAPRPoolp;
+ mPollFD.desc_type = APR_POLL_SOCKET;
+ mPollFD.reqevents = APR_POLLIN|APR_POLLERR|APR_POLLHUP;
+ mPollFD.rtnevents = 0;
+ mPollFD.desc.s = mSocket->getSocket();
+ mPollFD.client_data = (void*)this;
+
+ // pollset needs an update
+ update_pollset = true;
+ }
+
+ mMessagePipe = message_pipe;
+
+ if(update_pollset)
+ {
+ dirtyPollSet();
+ }
+}
+
+//static
+void LLPluginProcessParent::dirtyPollSet()
+{
+ sPollsetNeedsRebuild = true;
+
+ if(sReadThread)
+ {
+ LL_DEBUGS("PluginPoll") << "unpausing read thread " << LL_ENDL;
+ sReadThread->unpause();
+ }
+}
+
+void LLPluginProcessParent::updatePollset()
+{
+ if(!sInstancesMutex)
+ {
+ // No instances have been created yet. There's no work to do.
+ return;
+ }
+
+ LLMutexLock lock(sInstancesMutex);
+
+ if(sPollSet)
+ {
+ LL_DEBUGS("PluginPoll") << "destroying pollset " << sPollSet << LL_ENDL;
+ // delete the existing pollset.
+ apr_pollset_destroy(sPollSet);
+ sPollSet = NULL;
+ }
+
+ std::list<LLPluginProcessParent*>::iterator iter;
+ int count = 0;
+
+ // Count the number of instances that want to be in the pollset
+ for(iter = sInstances.begin(); iter != sInstances.end(); iter++)
+ {
+ (*iter)->mPolledInput = false;
+ if((*iter)->mPollFD.client_data)
+ {
+ // This instance has a socket that needs to be polled.
+ ++count;
+ }
+ }
+
+ if(sUseReadThread && sReadThread && !sReadThread->isQuitting())
+ {
+ if(!sPollSet && (count > 0))
+ {
+#ifdef APR_POLLSET_NOCOPY
+ // The pollset doesn't exist yet. Create it now.
+ apr_status_t status = apr_pollset_create(&sPollSet, count, gAPRPoolp, APR_POLLSET_NOCOPY);
+ if(status != APR_SUCCESS)
+ {
+#endif // APR_POLLSET_NOCOPY
+ LL_WARNS("PluginPoll") << "Couldn't create pollset. Falling back to non-pollset mode." << LL_ENDL;
+ sPollSet = NULL;
+#ifdef APR_POLLSET_NOCOPY
+ }
+ else
+ {
+ LL_DEBUGS("PluginPoll") << "created pollset " << sPollSet << LL_ENDL;
+
+ // Pollset was created, add all instances to it.
+ for(iter = sInstances.begin(); iter != sInstances.end(); iter++)
+ {
+ if((*iter)->mPollFD.client_data)
+ {
+ status = apr_pollset_add(sPollSet, &((*iter)->mPollFD));
+ if(status == APR_SUCCESS)
+ {
+ (*iter)->mPolledInput = true;
+ }
+ else
+ {
+ LL_WARNS("PluginPoll") << "apr_pollset_add failed with status " << status << LL_ENDL;
+ }
+ }
+ }
+ }
+#endif // APR_POLLSET_NOCOPY
+ }
+ }
+}
+
+void LLPluginProcessParent::setUseReadThread(bool use_read_thread)
+{
+ if(sUseReadThread != use_read_thread)
+ {
+ sUseReadThread = use_read_thread;
+
+ if(sUseReadThread)
+ {
+ if(!sReadThread)
+ {
+ // start up the read thread
+ LL_INFOS("PluginPoll") << "creating read thread " << LL_ENDL;
+
+ // make sure the pollset gets rebuilt.
+ sPollsetNeedsRebuild = true;
+
+ sReadThread = new LLPluginProcessParentPollThread;
+ sReadThread->start();
+ }
+ }
+ else
+ {
+ if(sReadThread)
+ {
+ // shut down the read thread
+ LL_INFOS("PluginPoll") << "destroying read thread " << LL_ENDL;
+ delete sReadThread;
+ sReadThread = NULL;
+ }
+ }
+
+ }
+}
+
+void LLPluginProcessParent::poll(F64 timeout)
+{
+ if(sPollsetNeedsRebuild || !sUseReadThread)
+ {
+ sPollsetNeedsRebuild = false;
+ updatePollset();
+ }
+
+ if(sPollSet)
+ {
+ apr_status_t status;
+ apr_int32_t count;
+ const apr_pollfd_t *descriptors;
+ status = apr_pollset_poll(sPollSet, (apr_interval_time_t)(timeout * 1000000), &count, &descriptors);
+ if(status == APR_SUCCESS)
+ {
+ // One or more of the descriptors signalled. Call them.
+ for(int i = 0; i < count; i++)
+ {
+ LLPluginProcessParent *self = (LLPluginProcessParent *)(descriptors[i].client_data);
+ // NOTE: the descriptor returned here is actually a COPY of the original (even though we create the pollset with APR_POLLSET_NOCOPY).
+ // This means that even if the parent has set its mPollFD.client_data to NULL, the old pointer may still there in this descriptor.
+ // It's even possible that the old pointer no longer points to a valid LLPluginProcessParent.
+ // This means that we can't safely dereference the 'self' pointer here without some extra steps...
+ if(self)
+ {
+ // Make sure this pointer is still in the instances list
+ bool valid = false;
+ {
+ LLMutexLock lock(sInstancesMutex);
+ for(std::list<LLPluginProcessParent*>::iterator iter = sInstances.begin(); iter != sInstances.end(); ++iter)
+ {
+ if(*iter == self)
+ {
+ // Lock the instance's mutex before unlocking the global mutex.
+ // This avoids a possible race condition where the instance gets deleted between this check and the servicePoll() call.
+ self->mIncomingQueueMutex.lock();
+ valid = true;
+ break;
+ }
+ }
+ }
+
+ if(valid)
+ {
+ // The instance is still valid.
+ // Pull incoming messages off the socket
+ self->servicePoll();
+ self->mIncomingQueueMutex.unlock();
+ }
+ else
+ {
+ LL_DEBUGS("PluginPoll") << "detected deleted instance " << self << LL_ENDL;
+ }
+
+ }
+ }
+ }
+ else if(APR_STATUS_IS_TIMEUP(status))
+ {
+ // timed out with no incoming data. Just return.
+ }
+ else if(status == EBADF)
+ {
+ // This happens when one of the file descriptors in the pollset is destroyed, which happens whenever a plugin's socket is closed.
+ // The pollset has been or will be recreated, so just return.
+ LL_DEBUGS("PluginPoll") << "apr_pollset_poll returned EBADF" << LL_ENDL;
+ }
+ else if(status != APR_SUCCESS)
+ {
+ LL_WARNS("PluginPoll") << "apr_pollset_poll failed with status " << status << LL_ENDL;
+ }
+ }
}
+void LLPluginProcessParent::servicePoll()
+{
+ bool result = true;
+
+ // poll signalled on this object's socket. Try to process incoming messages.
+ if(mMessagePipe)
+ {
+ result = mMessagePipe->pumpInput(0.0f);
+ }
+
+ if(!result)
+ {
+ // If we got a read error on input, remove this pipe from the pollset
+ apr_pollset_remove(sPollSet, &mPollFD);
+
+ // and tell the code not to re-add it
+ mPollFD.client_data = NULL;
+ }
+}
void LLPluginProcessParent::receiveMessageRaw(const std::string &message)
{
LL_DEBUGS("Plugin") << "Received: " << message << LL_ENDL;
-
- // FIXME: should this go into a queue instead?
LLPluginMessage parsed;
if(parsed.parse(message) != -1)
{
- receiveMessage(parsed);
+ if(parsed.hasValue("blocking_request"))
+ {
+ mBlocked = true;
+ }
+
+ if(mPolledInput)
+ {
+ // This is being called on the polling thread -- only do minimal processing/queueing.
+ receiveMessageEarly(parsed);
+ }
+ else
+ {
+ // This is not being called on the polling thread -- do full message processing at this time.
+ receiveMessage(parsed);
+ }
}
}
-void LLPluginProcessParent::receiveMessage(const LLPluginMessage &message)
+void LLPluginProcessParent::receiveMessageEarly(const LLPluginMessage &message)
{
- if(message.hasValue("blocking_request"))
+ // NOTE: this function will be called from the polling thread. It will be called with mIncomingQueueMutex _already locked_.
+
+ bool handled = false;
+
+ std::string message_class = message.getClass();
+ if(message_class == LLPLUGIN_MESSAGE_CLASS_INTERNAL)
+ {
+ // no internal messages need to be handled early.
+ }
+ else
{
- mBlocked = true;
+ // Call out to the owner and see if they to reply
+ // TODO: Should this only happen when blocked?
+ if(mOwner != NULL)
+ {
+ handled = mOwner->receivePluginMessageEarly(message);
+ }
}
+
+ if(!handled)
+ {
+ // any message that wasn't handled early needs to be queued.
+ mIncomingQueue.push(message);
+ }
+}
+void LLPluginProcessParent::receiveMessage(const LLPluginMessage &message)
+{
std::string message_class = message.getClass();
if(message_class == LLPLUGIN_MESSAGE_CLASS_INTERNAL)
{
@@ -704,12 +1080,12 @@ bool LLPluginProcessParent::pluginLockedUpOrQuit()
if(!mProcess.isRunning())
{
- LL_WARNS("Plugin") << "child exited" << llendl;
+ LL_WARNS("Plugin") << "child exited" << LL_ENDL;
result = true;
}
else if(pluginLockedUp())
{
- LL_WARNS("Plugin") << "timeout" << llendl;
+ LL_WARNS("Plugin") << "timeout" << LL_ENDL;
result = true;
}
diff --git a/indra/llplugin/llpluginprocessparent.h b/indra/llplugin/llpluginprocessparent.h
index 31f125bfb3..4dff835b6a 100644
--- a/indra/llplugin/llpluginprocessparent.h
+++ b/indra/llplugin/llpluginprocessparent.h
@@ -41,12 +41,14 @@
#include "llpluginsharedmemory.h"
#include "lliosocket.h"
+#include "llthread.h"
class LLPluginProcessParentOwner
{
public:
virtual ~LLPluginProcessParentOwner();
virtual void receivePluginMessage(const LLPluginMessage &message) = 0;
+ virtual bool receivePluginMessageEarly(const LLPluginMessage &message) {return false;};
// This will only be called when the plugin has died unexpectedly
virtual void pluginLaunchFailed() {};
virtual void pluginDied() {};
@@ -90,7 +92,9 @@ public:
void receiveMessage(const LLPluginMessage &message);
// Inherited from LLPluginMessagePipeOwner
- void receiveMessageRaw(const std::string &message);
+ /*virtual*/ void receiveMessageRaw(const std::string &message);
+ /*virtual*/ void receiveMessageEarly(const LLPluginMessage &message);
+ /*virtual*/ void setMessagePipe(LLPluginMessagePipe *message_pipe) ;
// This adds a memory segment shared with the client, generating a name for the segment. The name generated is guaranteed to be unique on the host.
// The caller must call removeSharedMemory first (and wait until getSharedMemorySize returns 0 for the indicated name) before re-adding a segment with the same name.
@@ -113,7 +117,11 @@ public:
void setLockupTimeout(F32 timeout) { mPluginLockupTimeout = timeout; };
F64 getCPUUsage() { return mCPUUsage; };
-
+
+ static void poll(F64 timeout);
+ static bool canPollThreadRun() { return (sPollSet || sPollsetNeedsRebuild || sUseReadThread); };
+ static void setUseReadThread(bool use_read_thread);
+ static bool getUseReadThread() { return sUseReadThread; };
private:
enum EState
@@ -164,12 +172,26 @@ private:
bool mDisableTimeout;
bool mDebug;
bool mBlocked;
+ bool mPolledInput;
LLProcessLauncher mDebugger;
F32 mPluginLaunchTimeout; // Somewhat longer timeout for initial launch.
F32 mPluginLockupTimeout; // If we don't receive a heartbeat in this many seconds, we declare the plugin locked up.
+ static bool sUseReadThread;
+ apr_pollfd_t mPollFD;
+ static apr_pollset_t *sPollSet;
+ static bool sPollsetNeedsRebuild;
+ static LLMutex *sInstancesMutex;
+ static std::list<LLPluginProcessParent*> sInstances;
+ static void dirtyPollSet();
+ static void updatePollset();
+ void servicePoll();
+ static LLThread *sReadThread;
+
+ LLMutex mIncomingQueueMutex;
+ std::queue<LLPluginMessage> mIncomingQueue;
};
#endif // LL_LLPLUGINPROCESSPARENT_H