diff options
Diffstat (limited to 'indra/llplugin')
| -rw-r--r-- | indra/llplugin/llpluginmessagepipe.cpp | 104 | ||||
| -rw-r--r-- | indra/llplugin/llpluginmessagepipe.h | 9 | ||||
| -rw-r--r-- | indra/llplugin/llpluginprocesschild.cpp | 9 | ||||
| -rw-r--r-- | indra/llplugin/llpluginprocessparent.cpp | 426 | ||||
| -rw-r--r-- | indra/llplugin/llpluginprocessparent.h | 26 | 
5 files changed, 523 insertions, 51 deletions
| diff --git a/indra/llplugin/llpluginmessagepipe.cpp b/indra/llplugin/llpluginmessagepipe.cpp index e524c88cf8..89f8b44569 100644 --- a/indra/llplugin/llpluginmessagepipe.cpp +++ b/indra/llplugin/llpluginmessagepipe.cpp @@ -96,11 +96,14 @@ void LLPluginMessagePipeOwner::killMessagePipe(void)  	}  } -LLPluginMessagePipe::LLPluginMessagePipe(LLPluginMessagePipeOwner *owner, LLSocket::ptr_t socket) +LLPluginMessagePipe::LLPluginMessagePipe(LLPluginMessagePipeOwner *owner, LLSocket::ptr_t socket): +	mInputMutex(gAPRPoolp), +	mOutputMutex(gAPRPoolp), +	mOwner(owner), +	mSocket(socket)  { -	mOwner = owner; +	  	mOwner->setMessagePipe(this); -	mSocket = socket;  }  LLPluginMessagePipe::~LLPluginMessagePipe() @@ -114,6 +117,7 @@ LLPluginMessagePipe::~LLPluginMessagePipe()  bool LLPluginMessagePipe::addMessage(const std::string &message)  {  	// queue the message for later output +	LLMutexLock lock(&mOutputMutex);  	mOutput += message;  	mOutput += MESSAGE_DELIMITER;	// message separator @@ -149,6 +153,18 @@ void LLPluginMessagePipe::setSocketTimeout(apr_interval_time_t timeout_usec)  bool LLPluginMessagePipe::pump(F64 timeout)  { +	bool result = pumpOutput(); +	 +	if(result) +	{ +		result = pumpInput(timeout); +	} +	 +	return result; +} + +bool LLPluginMessagePipe::pumpOutput() +{  	bool result = true;  	if(mSocket) @@ -156,6 +172,7 @@ bool LLPluginMessagePipe::pump(F64 timeout)  		apr_status_t status;  		apr_size_t size; +		LLMutexLock lock(&mOutputMutex);  		if(!mOutput.empty())  		{  			// write any outgoing messages @@ -183,6 +200,17 @@ bool LLPluginMessagePipe::pump(F64 timeout)  				// remove the written part from the buffer and try again later.  				mOutput = mOutput.substr(size);  			} +			else if(APR_STATUS_IS_EOF(status)) +			{ +				// This is what we normally expect when a plugin exits. +				llinfos << "Got EOF from plugin socket. " << llendl; + +				if(mOwner) +				{ +					mOwner->socketError(status); +				} +				result = false; +			}  			else   			{  				// some other error @@ -196,6 +224,19 @@ bool LLPluginMessagePipe::pump(F64 timeout)  				result = false;  			}  		} +	} +	 +	return result; +} + +bool LLPluginMessagePipe::pumpInput(F64 timeout) +{ +	bool result = true; + +	if(mSocket) +	{ +		apr_status_t status; +		apr_size_t size;  		// FIXME: For some reason, the apr timeout stuff isn't working properly on windows.  		// Until such time as we figure out why, don't try to use the socket timeout -- just sleep here instead. @@ -216,8 +257,16 @@ bool LLPluginMessagePipe::pump(F64 timeout)  			char input_buf[1024];  			apr_size_t request_size; -			// Start out by reading one byte, so that any data received will wake us up. -			request_size = 1; +			if(timeout == 0.0f) +			{ +				// If we have no timeout, start out with a full read. +				request_size = sizeof(input_buf); +			} +			else +			{ +				// Start out by reading one byte, so that any data received will wake us up. +				request_size = 1; +			}  			// and use the timeout so we'll sleep if no data is available.  			setSocketTimeout((apr_interval_time_t)(timeout * 1000000)); @@ -236,11 +285,14 @@ bool LLPluginMessagePipe::pump(F64 timeout)  //				LL_INFOS("Plugin") << "after apr_socket_recv, size = " << size << LL_ENDL;  				if(size > 0) +				{ +					LLMutexLock lock(&mInputMutex);  					mInput.append(input_buf, size); +				}  				if(status == APR_SUCCESS)  				{ -//					llinfos << "success, read " << size << llendl; +					LL_DEBUGS("PluginSocket") << "success, read " << size << LL_ENDL;  					if(size != request_size)  					{ @@ -250,16 +302,28 @@ bool LLPluginMessagePipe::pump(F64 timeout)  				}  				else if(APR_STATUS_IS_TIMEUP(status))  				{ -//					llinfos << "TIMEUP, read " << size << llendl; +					LL_DEBUGS("PluginSocket") << "TIMEUP, read " << size << LL_ENDL;  					// Timeout was hit.  Since the initial read is 1 byte, this should never be a partial read.  					break;  				}  				else if(APR_STATUS_IS_EAGAIN(status))  				{ -//					llinfos << "EAGAIN, read " << size << llendl; +					LL_DEBUGS("PluginSocket") << "EAGAIN, read " << size << LL_ENDL; -					// We've been doing partial reads, and we're done now. +					// Non-blocking read returned immediately. +					break; +				} +				else if(APR_STATUS_IS_EOF(status)) +				{ +					// This is what we normally expect when a plugin exits. +					LL_INFOS("PluginSocket") << "Got EOF from plugin socket. " << LL_ENDL; + +					if(mOwner) +					{ +						mOwner->socketError(status); +					} +					result = false;  					break;  				}  				else @@ -276,22 +340,18 @@ bool LLPluginMessagePipe::pump(F64 timeout)  					break;  				} -				// Second and subsequent reads should not use the timeout -				setSocketTimeout(0); -				// and should try to fill the input buffer -				request_size = sizeof(input_buf); +				if(timeout != 0.0f) +				{ +					// Second and subsequent reads should not use the timeout +					setSocketTimeout(0); +					// and should try to fill the input buffer +					request_size = sizeof(input_buf); +				}  			}  			processInput();  		}  	} - -	if(!result) -	{ -		// If we got an error, we're done. -		LL_INFOS("Plugin") << "Error from socket, cleaning up." << LL_ENDL; -		delete this; -	}  	return result;	  } @@ -300,6 +360,7 @@ void LLPluginMessagePipe::processInput(void)  {  	// Look for input delimiter(s) in the input buffer.  	int delim; +	mInputMutex.lock();  	while((delim = mInput.find(MESSAGE_DELIMITER)) != std::string::npos)  	{	  		// Let the owner process this message @@ -310,12 +371,15 @@ void LLPluginMessagePipe::processInput(void)  			// and this guarantees that the messages will get dequeued correctly.  			std::string message(mInput, 0, delim);  			mInput.erase(0, delim + 1); +			mInputMutex.unlock();  			mOwner->receiveMessageRaw(message); +			mInputMutex.lock();  		}  		else  		{  			LL_WARNS("Plugin") << "!mOwner" << LL_ENDL;  		}  	} +	mInputMutex.unlock();  } diff --git a/indra/llplugin/llpluginmessagepipe.h b/indra/llplugin/llpluginmessagepipe.h index 1ddb38de68..1b0a08254b 100644 --- a/indra/llplugin/llpluginmessagepipe.h +++ b/indra/llplugin/llpluginmessagepipe.h @@ -35,6 +35,7 @@  #define LL_LLPLUGINMESSAGEPIPE_H  #include "lliosocket.h" +#include "llthread.h"  class LLPluginMessagePipe; @@ -51,7 +52,7 @@ public:  	virtual apr_status_t socketError(apr_status_t error);  	// called from LLPluginMessagePipe to manage the connection with LLPluginMessagePipeOwner -- do not use! -	virtual void setMessagePipe(LLPluginMessagePipe *message_pipe) ; +	virtual void setMessagePipe(LLPluginMessagePipe *message_pipe);  protected:  	// returns false if writeMessageRaw() would drop the message @@ -76,14 +77,18 @@ public:  	void clearOwner(void);  	bool pump(F64 timeout = 0.0f); -	 +	bool pumpOutput(); +	bool pumpInput(F64 timeout = 0.0f); +		  protected:	  	void processInput(void);  	// used internally by pump()  	void setSocketTimeout(apr_interval_time_t timeout_usec); +	LLMutex mInputMutex;  	std::string mInput; +	LLMutex mOutputMutex;  	std::string mOutput;  	LLPluginMessagePipeOwner *mOwner; diff --git a/indra/llplugin/llpluginprocesschild.cpp b/indra/llplugin/llpluginprocesschild.cpp index 2d078cd6ed..d1cf91b253 100644 --- a/indra/llplugin/llpluginprocesschild.cpp +++ b/indra/llplugin/llpluginprocesschild.cpp @@ -85,9 +85,14 @@ void LLPluginProcessChild::idle(void)  	bool idle_again;  	do  	{ -		if(mSocketError != APR_SUCCESS) +		if(APR_STATUS_IS_EOF(mSocketError))  		{ -			LL_INFOS("Plugin") << "message pipe is in error state, moving to STATE_ERROR"<< LL_ENDL; +			// Plugin socket was closed.  This covers both normal plugin termination and host crashes. +			setState(STATE_ERROR); +		} +		else if(mSocketError != APR_SUCCESS) +		{ +			LL_INFOS("Plugin") << "message pipe is in error state (" << mSocketError << "), moving to STATE_ERROR"<< LL_ENDL;  			setState(STATE_ERROR);  		}	 diff --git a/indra/llplugin/llpluginprocessparent.cpp b/indra/llplugin/llpluginprocessparent.cpp index e273410a1d..3589b22a77 100644 --- a/indra/llplugin/llpluginprocessparent.cpp +++ b/indra/llplugin/llpluginprocessparent.cpp @@ -45,8 +45,51 @@ LLPluginProcessParentOwner::~LLPluginProcessParentOwner()  } -LLPluginProcessParent::LLPluginProcessParent(LLPluginProcessParentOwner *owner) +bool LLPluginProcessParent::sUseReadThread = false; +apr_pollset_t *LLPluginProcessParent::sPollSet = NULL; +bool LLPluginProcessParent::sPollsetNeedsRebuild = false; +LLMutex *LLPluginProcessParent::sInstancesMutex; +std::list<LLPluginProcessParent*> LLPluginProcessParent::sInstances; +LLThread *LLPluginProcessParent::sReadThread = NULL; + + +class LLPluginProcessParentPollThread: public LLThread  { +public: +	LLPluginProcessParentPollThread() : +		LLThread("LLPluginProcessParentPollThread", gAPRPoolp) +	{ +	} +protected: +	// Inherited from LLThread +	/*virtual*/ void run(void) +	{ +		while(!isQuitting() && LLPluginProcessParent::getUseReadThread()) +		{ +			LLPluginProcessParent::poll(0.1f); +			checkPause(); +		} +		 +		// Final poll to clean up the pollset, etc. +		LLPluginProcessParent::poll(0.0f); +	}  + +	// Inherited from LLThread +	/*virtual*/ bool runCondition(void) +	{ +		return(LLPluginProcessParent::canPollThreadRun()); +	} + +}; + +LLPluginProcessParent::LLPluginProcessParent(LLPluginProcessParentOwner *owner): +	mIncomingQueueMutex(gAPRPoolp) +{ +	if(!sInstancesMutex) +	{ +		sInstancesMutex = new LLMutex(gAPRPoolp); +	} +	  	mOwner = owner;  	mBoundPort = 0;  	mState = STATE_UNINITIALIZED; @@ -55,18 +98,36 @@ LLPluginProcessParent::LLPluginProcessParent(LLPluginProcessParentOwner *owner)  	mDisableTimeout = false;  	mDebug = false;  	mBlocked = false; +	mPolledInput = false; +	mPollFD.client_data = NULL;  	mPluginLaunchTimeout = 60.0f;  	mPluginLockupTimeout = 15.0f;  	// Don't start the timer here -- start it when we actually launch the plugin process.  	mHeartbeat.stop(); +	 +	// Don't add to the global list until fully constructed. +	{ +		LLMutexLock lock(sInstancesMutex); +		sInstances.push_back(this); +	}  }  LLPluginProcessParent::~LLPluginProcessParent()  {  	LL_DEBUGS("Plugin") << "destructor" << LL_ENDL; +	// Remove from the global list before beginning destruction. +	{ +		// Make sure to get the global mutex _first_ here, to avoid a possible deadlock against LLPluginProcessParent::poll() +		LLMutexLock lock(sInstancesMutex); +		{ +			LLMutexLock lock2(&mIncomingQueueMutex); +			sInstances.remove(this); +		} +	} +  	// Destroy any remaining shared memory regions  	sharedMemoryRegionsType::iterator iter;  	while((iter = mSharedMemoryRegions.begin()) != mSharedMemoryRegions.end()) @@ -78,15 +139,17 @@ LLPluginProcessParent::~LLPluginProcessParent()  		mSharedMemoryRegions.erase(iter);  	} -	// orphaning the process means it won't be killed when the LLProcessLauncher is destructed. -	// This is what we want -- it should exit cleanly once it notices the sockets have been closed. -	mProcess.orphan(); +	mProcess.kill();  	killSockets();  }  void LLPluginProcessParent::killSockets(void)  { -	killMessagePipe(); +	{ +		LLMutexLock lock(&mIncomingQueueMutex); +		killMessagePipe(); +	} +  	mListenSocket.reset();  	mSocket.reset();  } @@ -160,21 +223,47 @@ void LLPluginProcessParent::idle(void)  	do  	{ +		// process queued messages +		mIncomingQueueMutex.lock(); +		while(!mIncomingQueue.empty()) +		{ +			LLPluginMessage message = mIncomingQueue.front(); +			mIncomingQueue.pop(); +			mIncomingQueueMutex.unlock(); +				 +			receiveMessage(message); +			 +			mIncomingQueueMutex.lock(); +		} + +		mIncomingQueueMutex.unlock(); +		  		// Give time to network processing  		if(mMessagePipe)  		{ -			if(!mMessagePipe->pump()) +			// Drain any queued outgoing messages +			mMessagePipe->pumpOutput(); +			 +			// Only do input processing here if this instance isn't in a pollset. +			if(!mPolledInput)  			{ -//				LL_WARNS("Plugin") << "Message pipe hit an error state" << LL_ENDL; -				errorState(); +				mMessagePipe->pumpInput();  			}  		} - -		if((mSocketError != APR_SUCCESS) && (mState <= STATE_RUNNING)) +		 +		if(mState <= STATE_RUNNING)  		{ -			// The socket is in an error state -- the plugin is gone. -			LL_WARNS("Plugin") << "Socket hit an error state (" << mSocketError << ")" << LL_ENDL; -			errorState(); +			if(APR_STATUS_IS_EOF(mSocketError)) +			{ +				// Plugin socket was closed.  This covers both normal plugin termination and plugin crashes. +				errorState(); +			} +			else if(mSocketError != APR_SUCCESS) +			{ +				// The socket is in an error state -- the plugin is gone. +				LL_WARNS("Plugin") << "Socket hit an error state (" << mSocketError << ")" << LL_ENDL; +				errorState(); +			}  		}	  		// If a state needs to go directly to another state (as a performance enhancement), it can set idle_again to true after calling setState(). @@ -355,7 +444,7 @@ void LLPluginProcessParent::idle(void)  			break;  			case STATE_HELLO: -				LL_DEBUGS("Plugin") << "received hello message" << llendl; +				LL_DEBUGS("Plugin") << "received hello message" << LL_ENDL;  				// Send the message to load the plugin  				{ @@ -389,7 +478,7 @@ void LLPluginProcessParent::idle(void)  				}  				else if(pluginLockedUp())  				{ -					LL_WARNS("Plugin") << "timeout in exiting state, bailing out" << llendl; +					LL_WARNS("Plugin") << "timeout in exiting state, bailing out" << LL_ENDL;  					errorState();  				}  			break; @@ -411,8 +500,7 @@ void LLPluginProcessParent::idle(void)  			break;  			case STATE_CLEANUP: -				// Don't do a kill here anymore -- closing the sockets is the new 'kill'. -				mProcess.orphan(); +				mProcess.kill();  				killSockets();  				setState(STATE_DONE);  			break; @@ -491,29 +579,317 @@ void LLPluginProcessParent::sendMessage(const LLPluginMessage &message)  	std::string buffer = message.generate();  	LL_DEBUGS("Plugin") << "Sending: " << buffer << LL_ENDL;	  	writeMessageRaw(buffer); +	 +	// Try to send message immediately. +	if(mMessagePipe) +	{ +		mMessagePipe->pumpOutput(); +	} +} + +//virtual  +void LLPluginProcessParent::setMessagePipe(LLPluginMessagePipe *message_pipe) +{ +	bool update_pollset = false; +	 +	if(mMessagePipe) +	{ +		// Unsetting an existing message pipe -- remove from the pollset		 +		mPollFD.client_data = NULL; + +		// pollset needs an update +		update_pollset = true; +	} +	if(message_pipe != NULL) +	{ +		// Set up the apr_pollfd_t +		mPollFD.p = gAPRPoolp; +		mPollFD.desc_type = APR_POLL_SOCKET; +		mPollFD.reqevents = APR_POLLIN|APR_POLLERR|APR_POLLHUP; +		mPollFD.rtnevents = 0; +		mPollFD.desc.s = mSocket->getSocket(); +		mPollFD.client_data = (void*)this;	 +		 +		// pollset needs an update +		update_pollset = true; +	} + +	mMessagePipe = message_pipe; +	 +	if(update_pollset) +	{ +		dirtyPollSet(); +	} +} + +//static  +void LLPluginProcessParent::dirtyPollSet() +{ +	sPollsetNeedsRebuild = true; +	 +	if(sReadThread) +	{ +		LL_DEBUGS("PluginPoll") << "unpausing read thread " << LL_ENDL; +		sReadThread->unpause(); +	} +} + +void LLPluginProcessParent::updatePollset() +{ +	if(!sInstancesMutex) +	{ +		// No instances have been created yet.  There's no work to do. +		return; +	} +		 +	LLMutexLock lock(sInstancesMutex); + +	if(sPollSet) +	{ +		LL_DEBUGS("PluginPoll") << "destroying pollset " << sPollSet << LL_ENDL; +		// delete the existing pollset. +		apr_pollset_destroy(sPollSet); +		sPollSet = NULL; +	} +	 +	std::list<LLPluginProcessParent*>::iterator iter; +	int count = 0; +	 +	// Count the number of instances that want to be in the pollset +	for(iter = sInstances.begin(); iter != sInstances.end(); iter++) +	{ +		(*iter)->mPolledInput = false; +		if((*iter)->mPollFD.client_data) +		{ +			// This instance has a socket that needs to be polled. +			++count; +		} +	} + +	if(sUseReadThread && sReadThread && !sReadThread->isQuitting()) +	{ +		if(!sPollSet && (count > 0)) +		{ +#ifdef APR_POLLSET_NOCOPY +			// The pollset doesn't exist yet.  Create it now. +			apr_status_t status = apr_pollset_create(&sPollSet, count, gAPRPoolp, APR_POLLSET_NOCOPY); +			if(status != APR_SUCCESS) +			{ +#endif // APR_POLLSET_NOCOPY +				LL_WARNS("PluginPoll") << "Couldn't create pollset.  Falling back to non-pollset mode." << LL_ENDL; +				sPollSet = NULL; +#ifdef APR_POLLSET_NOCOPY +			} +			else +			{ +				LL_DEBUGS("PluginPoll") << "created pollset " << sPollSet << LL_ENDL; +				 +				// Pollset was created, add all instances to it. +				for(iter = sInstances.begin(); iter != sInstances.end(); iter++) +				{ +					if((*iter)->mPollFD.client_data) +					{ +						status = apr_pollset_add(sPollSet, &((*iter)->mPollFD)); +						if(status == APR_SUCCESS) +						{ +							(*iter)->mPolledInput = true; +						} +						else +						{ +							LL_WARNS("PluginPoll") << "apr_pollset_add failed with status " << status << LL_ENDL; +						} +					} +				} +			} +#endif // APR_POLLSET_NOCOPY +		} +	} +} + +void LLPluginProcessParent::setUseReadThread(bool use_read_thread) +{ +	if(sUseReadThread != use_read_thread) +	{ +		sUseReadThread = use_read_thread; +		 +		if(sUseReadThread) +		{ +			if(!sReadThread) +			{ +				// start up the read thread +				LL_INFOS("PluginPoll") << "creating read thread " << LL_ENDL; + +				// make sure the pollset gets rebuilt. +				sPollsetNeedsRebuild = true; +				 +				sReadThread = new LLPluginProcessParentPollThread; +				sReadThread->start(); +			} +		} +		else +		{ +			if(sReadThread) +			{ +				// shut down the read thread +				LL_INFOS("PluginPoll") << "destroying read thread " << LL_ENDL; +				delete sReadThread; +				sReadThread = NULL; +			} +		} + +	} +} + +void LLPluginProcessParent::poll(F64 timeout) +{ +	if(sPollsetNeedsRebuild || !sUseReadThread) +	{ +		sPollsetNeedsRebuild = false; +		updatePollset(); +	} +	 +	if(sPollSet) +	{ +		apr_status_t status; +		apr_int32_t count; +		const apr_pollfd_t *descriptors; +		status = apr_pollset_poll(sPollSet, (apr_interval_time_t)(timeout * 1000000), &count, &descriptors); +		if(status == APR_SUCCESS) +		{ +			// One or more of the descriptors signalled.  Call them. +			for(int i = 0; i < count; i++) +			{ +				LLPluginProcessParent *self = (LLPluginProcessParent *)(descriptors[i].client_data); +				// NOTE: the descriptor returned here is actually a COPY of the original (even though we create the pollset with APR_POLLSET_NOCOPY). +				// This means that even if the parent has set its mPollFD.client_data to NULL, the old pointer may still there in this descriptor. +				// It's even possible that the old pointer no longer points to a valid LLPluginProcessParent. +				// This means that we can't safely dereference the 'self' pointer here without some extra steps... +				if(self) +				{ +					// Make sure this pointer is still in the instances list +					bool valid = false; +					{ +						LLMutexLock lock(sInstancesMutex); +						for(std::list<LLPluginProcessParent*>::iterator iter = sInstances.begin(); iter != sInstances.end(); ++iter) +						{ +							if(*iter == self) +							{ +								// Lock the instance's mutex before unlocking the global mutex.   +								// This avoids a possible race condition where the instance gets deleted between this check and the servicePoll() call. +								self->mIncomingQueueMutex.lock(); +								valid = true; +								break; +							} +						} +					} +					 +					if(valid) +					{ +						// The instance is still valid. +						// Pull incoming messages off the socket +						self->servicePoll(); +						self->mIncomingQueueMutex.unlock(); +					} +					else +					{ +						LL_DEBUGS("PluginPoll") << "detected deleted instance " << self << LL_ENDL; +					} + +				} +			} +		} +		else if(APR_STATUS_IS_TIMEUP(status)) +		{ +			// timed out with no incoming data.  Just return. +		} +		else if(status == EBADF) +		{ +			// This happens when one of the file descriptors in the pollset is destroyed, which happens whenever a plugin's socket is closed. +			// The pollset has been or will be recreated, so just return. +			LL_DEBUGS("PluginPoll") << "apr_pollset_poll returned EBADF" << LL_ENDL; +		} +		else if(status != APR_SUCCESS) +		{ +			LL_WARNS("PluginPoll") << "apr_pollset_poll failed with status " << status << LL_ENDL; +		} +	}  } +void LLPluginProcessParent::servicePoll() +{ +	bool result = true; +	 +	// poll signalled on this object's socket.  Try to process incoming messages. +	if(mMessagePipe) +	{ +		result = mMessagePipe->pumpInput(0.0f); +	} + +	if(!result) +	{ +		// If we got a read error on input, remove this pipe from the pollset +		apr_pollset_remove(sPollSet, &mPollFD); + +		// and tell the code not to re-add it +		mPollFD.client_data = NULL; +	} +}  void LLPluginProcessParent::receiveMessageRaw(const std::string &message)  {  	LL_DEBUGS("Plugin") << "Received: " << message << LL_ENDL; - -	// FIXME: should this go into a queue instead?  	LLPluginMessage parsed;  	if(parsed.parse(message) != -1)  	{ -		receiveMessage(parsed); +		if(parsed.hasValue("blocking_request")) +		{ +			mBlocked = true; +		} + +		if(mPolledInput) +		{ +			// This is being called on the polling thread -- only do minimal processing/queueing. +			receiveMessageEarly(parsed); +		} +		else +		{ +			// This is not being called on the polling thread -- do full message processing at this time. +			receiveMessage(parsed); +		}  	}  } -void LLPluginProcessParent::receiveMessage(const LLPluginMessage &message) +void LLPluginProcessParent::receiveMessageEarly(const LLPluginMessage &message)  { -	if(message.hasValue("blocking_request")) +	// NOTE: this function will be called from the polling thread.  It will be called with mIncomingQueueMutex _already locked_.  + +	bool handled = false; +	 +	std::string message_class = message.getClass(); +	if(message_class == LLPLUGIN_MESSAGE_CLASS_INTERNAL) +	{ +		// no internal messages need to be handled early. +	} +	else  	{ -		mBlocked = true; +		// Call out to the owner and see if they to reply +		// TODO: Should this only happen when blocked? +		if(mOwner != NULL) +		{ +			handled = mOwner->receivePluginMessageEarly(message); +		}  	} +	 +	if(!handled) +	{ +		// any message that wasn't handled early needs to be queued. +		mIncomingQueue.push(message); +	} +} +void LLPluginProcessParent::receiveMessage(const LLPluginMessage &message) +{  	std::string message_class = message.getClass();  	if(message_class == LLPLUGIN_MESSAGE_CLASS_INTERNAL)  	{ @@ -704,12 +1080,12 @@ bool LLPluginProcessParent::pluginLockedUpOrQuit()  	if(!mProcess.isRunning())  	{ -		LL_WARNS("Plugin") << "child exited" << llendl; +		LL_WARNS("Plugin") << "child exited" << LL_ENDL;  		result = true;  	}  	else if(pluginLockedUp())  	{ -		LL_WARNS("Plugin") << "timeout" << llendl; +		LL_WARNS("Plugin") << "timeout" << LL_ENDL;  		result = true;  	} diff --git a/indra/llplugin/llpluginprocessparent.h b/indra/llplugin/llpluginprocessparent.h index 31f125bfb3..4dff835b6a 100644 --- a/indra/llplugin/llpluginprocessparent.h +++ b/indra/llplugin/llpluginprocessparent.h @@ -41,12 +41,14 @@  #include "llpluginsharedmemory.h"  #include "lliosocket.h" +#include "llthread.h"  class LLPluginProcessParentOwner  {  public:  	virtual ~LLPluginProcessParentOwner();  	virtual void receivePluginMessage(const LLPluginMessage &message) = 0; +	virtual bool receivePluginMessageEarly(const LLPluginMessage &message) {return false;};  	// This will only be called when the plugin has died unexpectedly   	virtual void pluginLaunchFailed() {};  	virtual void pluginDied() {}; @@ -90,7 +92,9 @@ public:  	void receiveMessage(const LLPluginMessage &message);  	// Inherited from LLPluginMessagePipeOwner -	void receiveMessageRaw(const std::string &message); +	/*virtual*/ void receiveMessageRaw(const std::string &message); +	/*virtual*/ void receiveMessageEarly(const LLPluginMessage &message); +	/*virtual*/ void setMessagePipe(LLPluginMessagePipe *message_pipe) ;  	// This adds a memory segment shared with the client, generating a name for the segment.  The name generated is guaranteed to be unique on the host.  	// The caller must call removeSharedMemory first (and wait until getSharedMemorySize returns 0 for the indicated name) before re-adding a segment with the same name. @@ -113,7 +117,11 @@ public:  	void setLockupTimeout(F32 timeout) { mPluginLockupTimeout = timeout; };  	F64 getCPUUsage() { return mCPUUsage; }; - +	 +	static void poll(F64 timeout); +	static bool canPollThreadRun() { return (sPollSet || sPollsetNeedsRebuild || sUseReadThread); }; +	static void setUseReadThread(bool use_read_thread); +	static bool getUseReadThread() { return sUseReadThread; };  private:  	enum EState @@ -164,12 +172,26 @@ private:  	bool mDisableTimeout;  	bool mDebug;  	bool mBlocked; +	bool mPolledInput;  	LLProcessLauncher mDebugger;  	F32 mPluginLaunchTimeout;		// Somewhat longer timeout for initial launch.  	F32 mPluginLockupTimeout;		// If we don't receive a heartbeat in this many seconds, we declare the plugin locked up. +	static bool sUseReadThread; +	apr_pollfd_t mPollFD; +	static apr_pollset_t *sPollSet; +	static bool sPollsetNeedsRebuild; +	static LLMutex *sInstancesMutex; +	static std::list<LLPluginProcessParent*> sInstances; +	static void dirtyPollSet(); +	static void updatePollset(); +	void servicePoll(); +	static LLThread *sReadThread; +	 +	LLMutex mIncomingQueueMutex; +	std::queue<LLPluginMessage> mIncomingQueue;  };  #endif // LL_LLPLUGINPROCESSPARENT_H | 
