summaryrefslogtreecommitdiff
path: root/indra/llplugin/llpluginprocessparent.cpp
diff options
context:
space:
mode:
authorTofu Linden <tofu.linden@lindenlab.com>2010-04-30 10:10:16 +0100
committerTofu Linden <tofu.linden@lindenlab.com>2010-04-30 10:10:16 +0100
commit6f339d5e9eb0a43f70effdf069d414e0ee7fff2c (patch)
treec8070823806fbeb68ea0813d9a766a1b1db458f2 /indra/llplugin/llpluginprocessparent.cpp
parent4f46e777a501ff0bfedaf657ac6decdeba8acbe5 (diff)
parentd597a7e36a0a8c4cbee70b053c41aa01afeab6b5 (diff)
merge from viewer-trunk
Diffstat (limited to 'indra/llplugin/llpluginprocessparent.cpp')
-rw-r--r--indra/llplugin/llpluginprocessparent.cpp426
1 files changed, 401 insertions, 25 deletions
diff --git a/indra/llplugin/llpluginprocessparent.cpp b/indra/llplugin/llpluginprocessparent.cpp
index e273410a1d..3589b22a77 100644
--- a/indra/llplugin/llpluginprocessparent.cpp
+++ b/indra/llplugin/llpluginprocessparent.cpp
@@ -45,8 +45,51 @@ LLPluginProcessParentOwner::~LLPluginProcessParentOwner()
}
-LLPluginProcessParent::LLPluginProcessParent(LLPluginProcessParentOwner *owner)
+bool LLPluginProcessParent::sUseReadThread = false;
+apr_pollset_t *LLPluginProcessParent::sPollSet = NULL;
+bool LLPluginProcessParent::sPollsetNeedsRebuild = false;
+LLMutex *LLPluginProcessParent::sInstancesMutex;
+std::list<LLPluginProcessParent*> LLPluginProcessParent::sInstances;
+LLThread *LLPluginProcessParent::sReadThread = NULL;
+
+
+class LLPluginProcessParentPollThread: public LLThread
{
+public:
+ LLPluginProcessParentPollThread() :
+ LLThread("LLPluginProcessParentPollThread", gAPRPoolp)
+ {
+ }
+protected:
+ // Inherited from LLThread
+ /*virtual*/ void run(void)
+ {
+ while(!isQuitting() && LLPluginProcessParent::getUseReadThread())
+ {
+ LLPluginProcessParent::poll(0.1f);
+ checkPause();
+ }
+
+ // Final poll to clean up the pollset, etc.
+ LLPluginProcessParent::poll(0.0f);
+ }
+
+ // Inherited from LLThread
+ /*virtual*/ bool runCondition(void)
+ {
+ return(LLPluginProcessParent::canPollThreadRun());
+ }
+
+};
+
+LLPluginProcessParent::LLPluginProcessParent(LLPluginProcessParentOwner *owner):
+ mIncomingQueueMutex(gAPRPoolp)
+{
+ if(!sInstancesMutex)
+ {
+ sInstancesMutex = new LLMutex(gAPRPoolp);
+ }
+
mOwner = owner;
mBoundPort = 0;
mState = STATE_UNINITIALIZED;
@@ -55,18 +98,36 @@ LLPluginProcessParent::LLPluginProcessParent(LLPluginProcessParentOwner *owner)
mDisableTimeout = false;
mDebug = false;
mBlocked = false;
+ mPolledInput = false;
+ mPollFD.client_data = NULL;
mPluginLaunchTimeout = 60.0f;
mPluginLockupTimeout = 15.0f;
// Don't start the timer here -- start it when we actually launch the plugin process.
mHeartbeat.stop();
+
+ // Don't add to the global list until fully constructed.
+ {
+ LLMutexLock lock(sInstancesMutex);
+ sInstances.push_back(this);
+ }
}
LLPluginProcessParent::~LLPluginProcessParent()
{
LL_DEBUGS("Plugin") << "destructor" << LL_ENDL;
+ // Remove from the global list before beginning destruction.
+ {
+ // Make sure to get the global mutex _first_ here, to avoid a possible deadlock against LLPluginProcessParent::poll()
+ LLMutexLock lock(sInstancesMutex);
+ {
+ LLMutexLock lock2(&mIncomingQueueMutex);
+ sInstances.remove(this);
+ }
+ }
+
// Destroy any remaining shared memory regions
sharedMemoryRegionsType::iterator iter;
while((iter = mSharedMemoryRegions.begin()) != mSharedMemoryRegions.end())
@@ -78,15 +139,17 @@ LLPluginProcessParent::~LLPluginProcessParent()
mSharedMemoryRegions.erase(iter);
}
- // orphaning the process means it won't be killed when the LLProcessLauncher is destructed.
- // This is what we want -- it should exit cleanly once it notices the sockets have been closed.
- mProcess.orphan();
+ mProcess.kill();
killSockets();
}
void LLPluginProcessParent::killSockets(void)
{
- killMessagePipe();
+ {
+ LLMutexLock lock(&mIncomingQueueMutex);
+ killMessagePipe();
+ }
+
mListenSocket.reset();
mSocket.reset();
}
@@ -160,21 +223,47 @@ void LLPluginProcessParent::idle(void)
do
{
+ // process queued messages
+ mIncomingQueueMutex.lock();
+ while(!mIncomingQueue.empty())
+ {
+ LLPluginMessage message = mIncomingQueue.front();
+ mIncomingQueue.pop();
+ mIncomingQueueMutex.unlock();
+
+ receiveMessage(message);
+
+ mIncomingQueueMutex.lock();
+ }
+
+ mIncomingQueueMutex.unlock();
+
// Give time to network processing
if(mMessagePipe)
{
- if(!mMessagePipe->pump())
+ // Drain any queued outgoing messages
+ mMessagePipe->pumpOutput();
+
+ // Only do input processing here if this instance isn't in a pollset.
+ if(!mPolledInput)
{
-// LL_WARNS("Plugin") << "Message pipe hit an error state" << LL_ENDL;
- errorState();
+ mMessagePipe->pumpInput();
}
}
-
- if((mSocketError != APR_SUCCESS) && (mState <= STATE_RUNNING))
+
+ if(mState <= STATE_RUNNING)
{
- // The socket is in an error state -- the plugin is gone.
- LL_WARNS("Plugin") << "Socket hit an error state (" << mSocketError << ")" << LL_ENDL;
- errorState();
+ if(APR_STATUS_IS_EOF(mSocketError))
+ {
+ // Plugin socket was closed. This covers both normal plugin termination and plugin crashes.
+ errorState();
+ }
+ else if(mSocketError != APR_SUCCESS)
+ {
+ // The socket is in an error state -- the plugin is gone.
+ LL_WARNS("Plugin") << "Socket hit an error state (" << mSocketError << ")" << LL_ENDL;
+ errorState();
+ }
}
// If a state needs to go directly to another state (as a performance enhancement), it can set idle_again to true after calling setState().
@@ -355,7 +444,7 @@ void LLPluginProcessParent::idle(void)
break;
case STATE_HELLO:
- LL_DEBUGS("Plugin") << "received hello message" << llendl;
+ LL_DEBUGS("Plugin") << "received hello message" << LL_ENDL;
// Send the message to load the plugin
{
@@ -389,7 +478,7 @@ void LLPluginProcessParent::idle(void)
}
else if(pluginLockedUp())
{
- LL_WARNS("Plugin") << "timeout in exiting state, bailing out" << llendl;
+ LL_WARNS("Plugin") << "timeout in exiting state, bailing out" << LL_ENDL;
errorState();
}
break;
@@ -411,8 +500,7 @@ void LLPluginProcessParent::idle(void)
break;
case STATE_CLEANUP:
- // Don't do a kill here anymore -- closing the sockets is the new 'kill'.
- mProcess.orphan();
+ mProcess.kill();
killSockets();
setState(STATE_DONE);
break;
@@ -491,29 +579,317 @@ void LLPluginProcessParent::sendMessage(const LLPluginMessage &message)
std::string buffer = message.generate();
LL_DEBUGS("Plugin") << "Sending: " << buffer << LL_ENDL;
writeMessageRaw(buffer);
+
+ // Try to send message immediately.
+ if(mMessagePipe)
+ {
+ mMessagePipe->pumpOutput();
+ }
+}
+
+//virtual
+void LLPluginProcessParent::setMessagePipe(LLPluginMessagePipe *message_pipe)
+{
+ bool update_pollset = false;
+
+ if(mMessagePipe)
+ {
+ // Unsetting an existing message pipe -- remove from the pollset
+ mPollFD.client_data = NULL;
+
+ // pollset needs an update
+ update_pollset = true;
+ }
+ if(message_pipe != NULL)
+ {
+ // Set up the apr_pollfd_t
+ mPollFD.p = gAPRPoolp;
+ mPollFD.desc_type = APR_POLL_SOCKET;
+ mPollFD.reqevents = APR_POLLIN|APR_POLLERR|APR_POLLHUP;
+ mPollFD.rtnevents = 0;
+ mPollFD.desc.s = mSocket->getSocket();
+ mPollFD.client_data = (void*)this;
+
+ // pollset needs an update
+ update_pollset = true;
+ }
+
+ mMessagePipe = message_pipe;
+
+ if(update_pollset)
+ {
+ dirtyPollSet();
+ }
+}
+
+//static
+void LLPluginProcessParent::dirtyPollSet()
+{
+ sPollsetNeedsRebuild = true;
+
+ if(sReadThread)
+ {
+ LL_DEBUGS("PluginPoll") << "unpausing read thread " << LL_ENDL;
+ sReadThread->unpause();
+ }
+}
+
+void LLPluginProcessParent::updatePollset()
+{
+ if(!sInstancesMutex)
+ {
+ // No instances have been created yet. There's no work to do.
+ return;
+ }
+
+ LLMutexLock lock(sInstancesMutex);
+
+ if(sPollSet)
+ {
+ LL_DEBUGS("PluginPoll") << "destroying pollset " << sPollSet << LL_ENDL;
+ // delete the existing pollset.
+ apr_pollset_destroy(sPollSet);
+ sPollSet = NULL;
+ }
+
+ std::list<LLPluginProcessParent*>::iterator iter;
+ int count = 0;
+
+ // Count the number of instances that want to be in the pollset
+ for(iter = sInstances.begin(); iter != sInstances.end(); iter++)
+ {
+ (*iter)->mPolledInput = false;
+ if((*iter)->mPollFD.client_data)
+ {
+ // This instance has a socket that needs to be polled.
+ ++count;
+ }
+ }
+
+ if(sUseReadThread && sReadThread && !sReadThread->isQuitting())
+ {
+ if(!sPollSet && (count > 0))
+ {
+#ifdef APR_POLLSET_NOCOPY
+ // The pollset doesn't exist yet. Create it now.
+ apr_status_t status = apr_pollset_create(&sPollSet, count, gAPRPoolp, APR_POLLSET_NOCOPY);
+ if(status != APR_SUCCESS)
+ {
+#endif // APR_POLLSET_NOCOPY
+ LL_WARNS("PluginPoll") << "Couldn't create pollset. Falling back to non-pollset mode." << LL_ENDL;
+ sPollSet = NULL;
+#ifdef APR_POLLSET_NOCOPY
+ }
+ else
+ {
+ LL_DEBUGS("PluginPoll") << "created pollset " << sPollSet << LL_ENDL;
+
+ // Pollset was created, add all instances to it.
+ for(iter = sInstances.begin(); iter != sInstances.end(); iter++)
+ {
+ if((*iter)->mPollFD.client_data)
+ {
+ status = apr_pollset_add(sPollSet, &((*iter)->mPollFD));
+ if(status == APR_SUCCESS)
+ {
+ (*iter)->mPolledInput = true;
+ }
+ else
+ {
+ LL_WARNS("PluginPoll") << "apr_pollset_add failed with status " << status << LL_ENDL;
+ }
+ }
+ }
+ }
+#endif // APR_POLLSET_NOCOPY
+ }
+ }
+}
+
+void LLPluginProcessParent::setUseReadThread(bool use_read_thread)
+{
+ if(sUseReadThread != use_read_thread)
+ {
+ sUseReadThread = use_read_thread;
+
+ if(sUseReadThread)
+ {
+ if(!sReadThread)
+ {
+ // start up the read thread
+ LL_INFOS("PluginPoll") << "creating read thread " << LL_ENDL;
+
+ // make sure the pollset gets rebuilt.
+ sPollsetNeedsRebuild = true;
+
+ sReadThread = new LLPluginProcessParentPollThread;
+ sReadThread->start();
+ }
+ }
+ else
+ {
+ if(sReadThread)
+ {
+ // shut down the read thread
+ LL_INFOS("PluginPoll") << "destroying read thread " << LL_ENDL;
+ delete sReadThread;
+ sReadThread = NULL;
+ }
+ }
+
+ }
+}
+
+void LLPluginProcessParent::poll(F64 timeout)
+{
+ if(sPollsetNeedsRebuild || !sUseReadThread)
+ {
+ sPollsetNeedsRebuild = false;
+ updatePollset();
+ }
+
+ if(sPollSet)
+ {
+ apr_status_t status;
+ apr_int32_t count;
+ const apr_pollfd_t *descriptors;
+ status = apr_pollset_poll(sPollSet, (apr_interval_time_t)(timeout * 1000000), &count, &descriptors);
+ if(status == APR_SUCCESS)
+ {
+ // One or more of the descriptors signalled. Call them.
+ for(int i = 0; i < count; i++)
+ {
+ LLPluginProcessParent *self = (LLPluginProcessParent *)(descriptors[i].client_data);
+ // NOTE: the descriptor returned here is actually a COPY of the original (even though we create the pollset with APR_POLLSET_NOCOPY).
+ // This means that even if the parent has set its mPollFD.client_data to NULL, the old pointer may still there in this descriptor.
+ // It's even possible that the old pointer no longer points to a valid LLPluginProcessParent.
+ // This means that we can't safely dereference the 'self' pointer here without some extra steps...
+ if(self)
+ {
+ // Make sure this pointer is still in the instances list
+ bool valid = false;
+ {
+ LLMutexLock lock(sInstancesMutex);
+ for(std::list<LLPluginProcessParent*>::iterator iter = sInstances.begin(); iter != sInstances.end(); ++iter)
+ {
+ if(*iter == self)
+ {
+ // Lock the instance's mutex before unlocking the global mutex.
+ // This avoids a possible race condition where the instance gets deleted between this check and the servicePoll() call.
+ self->mIncomingQueueMutex.lock();
+ valid = true;
+ break;
+ }
+ }
+ }
+
+ if(valid)
+ {
+ // The instance is still valid.
+ // Pull incoming messages off the socket
+ self->servicePoll();
+ self->mIncomingQueueMutex.unlock();
+ }
+ else
+ {
+ LL_DEBUGS("PluginPoll") << "detected deleted instance " << self << LL_ENDL;
+ }
+
+ }
+ }
+ }
+ else if(APR_STATUS_IS_TIMEUP(status))
+ {
+ // timed out with no incoming data. Just return.
+ }
+ else if(status == EBADF)
+ {
+ // This happens when one of the file descriptors in the pollset is destroyed, which happens whenever a plugin's socket is closed.
+ // The pollset has been or will be recreated, so just return.
+ LL_DEBUGS("PluginPoll") << "apr_pollset_poll returned EBADF" << LL_ENDL;
+ }
+ else if(status != APR_SUCCESS)
+ {
+ LL_WARNS("PluginPoll") << "apr_pollset_poll failed with status " << status << LL_ENDL;
+ }
+ }
}
+void LLPluginProcessParent::servicePoll()
+{
+ bool result = true;
+
+ // poll signalled on this object's socket. Try to process incoming messages.
+ if(mMessagePipe)
+ {
+ result = mMessagePipe->pumpInput(0.0f);
+ }
+
+ if(!result)
+ {
+ // If we got a read error on input, remove this pipe from the pollset
+ apr_pollset_remove(sPollSet, &mPollFD);
+
+ // and tell the code not to re-add it
+ mPollFD.client_data = NULL;
+ }
+}
void LLPluginProcessParent::receiveMessageRaw(const std::string &message)
{
LL_DEBUGS("Plugin") << "Received: " << message << LL_ENDL;
-
- // FIXME: should this go into a queue instead?
LLPluginMessage parsed;
if(parsed.parse(message) != -1)
{
- receiveMessage(parsed);
+ if(parsed.hasValue("blocking_request"))
+ {
+ mBlocked = true;
+ }
+
+ if(mPolledInput)
+ {
+ // This is being called on the polling thread -- only do minimal processing/queueing.
+ receiveMessageEarly(parsed);
+ }
+ else
+ {
+ // This is not being called on the polling thread -- do full message processing at this time.
+ receiveMessage(parsed);
+ }
}
}
-void LLPluginProcessParent::receiveMessage(const LLPluginMessage &message)
+void LLPluginProcessParent::receiveMessageEarly(const LLPluginMessage &message)
{
- if(message.hasValue("blocking_request"))
+ // NOTE: this function will be called from the polling thread. It will be called with mIncomingQueueMutex _already locked_.
+
+ bool handled = false;
+
+ std::string message_class = message.getClass();
+ if(message_class == LLPLUGIN_MESSAGE_CLASS_INTERNAL)
+ {
+ // no internal messages need to be handled early.
+ }
+ else
{
- mBlocked = true;
+ // Call out to the owner and see if they to reply
+ // TODO: Should this only happen when blocked?
+ if(mOwner != NULL)
+ {
+ handled = mOwner->receivePluginMessageEarly(message);
+ }
}
+
+ if(!handled)
+ {
+ // any message that wasn't handled early needs to be queued.
+ mIncomingQueue.push(message);
+ }
+}
+void LLPluginProcessParent::receiveMessage(const LLPluginMessage &message)
+{
std::string message_class = message.getClass();
if(message_class == LLPLUGIN_MESSAGE_CLASS_INTERNAL)
{
@@ -704,12 +1080,12 @@ bool LLPluginProcessParent::pluginLockedUpOrQuit()
if(!mProcess.isRunning())
{
- LL_WARNS("Plugin") << "child exited" << llendl;
+ LL_WARNS("Plugin") << "child exited" << LL_ENDL;
result = true;
}
else if(pluginLockedUp())
{
- LL_WARNS("Plugin") << "timeout" << llendl;
+ LL_WARNS("Plugin") << "timeout" << LL_ENDL;
result = true;
}