diff options
| -rw-r--r-- | indra/llplugin/llpluginmessagepipe.cpp | 107 | ||||
| -rw-r--r-- | indra/llplugin/llpluginmessagepipe.h | 9 | ||||
| -rw-r--r-- | indra/llplugin/llpluginprocesschild.cpp | 9 | ||||
| -rw-r--r-- | indra/llplugin/llpluginprocessparent.cpp | 414 | ||||
| -rw-r--r-- | indra/llplugin/llpluginprocessparent.h | 26 | ||||
| -rw-r--r-- | indra/newview/app_settings/settings.xml | 13 | ||||
| -rw-r--r-- | indra/newview/llappviewer.cpp | 3 | ||||
| -rw-r--r-- | indra/newview/llviewermedia.cpp | 11 | ||||
| -rw-r--r-- | indra/newview/skins/default/xui/en/menu_viewer.xml | 10 | ||||
| -rw-r--r-- | indra/test_apps/llplugintest/llmediaplugintest.cpp | 14 | ||||
| -rw-r--r-- | indra/test_apps/llplugintest/llmediaplugintest.h | 2 | 
11 files changed, 568 insertions, 50 deletions
diff --git a/indra/llplugin/llpluginmessagepipe.cpp b/indra/llplugin/llpluginmessagepipe.cpp index e524c88cf8..a62534de95 100644 --- a/indra/llplugin/llpluginmessagepipe.cpp +++ b/indra/llplugin/llpluginmessagepipe.cpp @@ -96,11 +96,14 @@ void LLPluginMessagePipeOwner::killMessagePipe(void)  	}  } -LLPluginMessagePipe::LLPluginMessagePipe(LLPluginMessagePipeOwner *owner, LLSocket::ptr_t socket) +LLPluginMessagePipe::LLPluginMessagePipe(LLPluginMessagePipeOwner *owner, LLSocket::ptr_t socket): +	mInputMutex(gAPRPoolp), +	mOutputMutex(gAPRPoolp), +	mOwner(owner), +	mSocket(socket)  { -	mOwner = owner; +	  	mOwner->setMessagePipe(this); -	mSocket = socket;  }  LLPluginMessagePipe::~LLPluginMessagePipe() @@ -114,8 +117,10 @@ LLPluginMessagePipe::~LLPluginMessagePipe()  bool LLPluginMessagePipe::addMessage(const std::string &message)  {  	// queue the message for later output +	mOutputMutex.lock();  	mOutput += message;  	mOutput += MESSAGE_DELIMITER;	// message separator +	mOutputMutex.unlock();  	return true;  } @@ -149,6 +154,18 @@ void LLPluginMessagePipe::setSocketTimeout(apr_interval_time_t timeout_usec)  bool LLPluginMessagePipe::pump(F64 timeout)  { +	bool result = pumpOutput(); +	 +	if(result) +	{ +		result = pumpInput(timeout); +	} +	 +	return result; +} + +bool LLPluginMessagePipe::pumpOutput() +{  	bool result = true;  	if(mSocket) @@ -156,6 +173,7 @@ bool LLPluginMessagePipe::pump(F64 timeout)  		apr_status_t status;  		apr_size_t size; +		mOutputMutex.lock();  		if(!mOutput.empty())  		{  			// write any outgoing messages @@ -183,6 +201,17 @@ bool LLPluginMessagePipe::pump(F64 timeout)  				// remove the written part from the buffer and try again later.  				mOutput = mOutput.substr(size);  			} +			else if(APR_STATUS_IS_EOF(status)) +			{ +				// This is what we normally expect when a plugin exits. +				llinfos << "Got EOF from plugin socket. " << llendl; + +				if(mOwner) +				{ +					mOwner->socketError(status); +				} +				result = false; +			}  			else   			{  				// some other error @@ -196,6 +225,20 @@ bool LLPluginMessagePipe::pump(F64 timeout)  				result = false;  			}  		} +		mOutputMutex.unlock(); +	} +	 +	return result; +} + +bool LLPluginMessagePipe::pumpInput(F64 timeout) +{ +	bool result = true; + +	if(mSocket) +	{ +		apr_status_t status; +		apr_size_t size;  		// FIXME: For some reason, the apr timeout stuff isn't working properly on windows.  		// Until such time as we figure out why, don't try to use the socket timeout -- just sleep here instead. @@ -216,8 +259,16 @@ bool LLPluginMessagePipe::pump(F64 timeout)  			char input_buf[1024];  			apr_size_t request_size; -			// Start out by reading one byte, so that any data received will wake us up. -			request_size = 1; +			if(timeout == 0.0f) +			{ +				// If we have no timeout, start out with a full read. +				request_size = sizeof(input_buf); +			} +			else +			{ +				// Start out by reading one byte, so that any data received will wake us up. +				request_size = 1; +			}  			// and use the timeout so we'll sleep if no data is available.  			setSocketTimeout((apr_interval_time_t)(timeout * 1000000)); @@ -236,11 +287,15 @@ bool LLPluginMessagePipe::pump(F64 timeout)  //				LL_INFOS("Plugin") << "after apr_socket_recv, size = " << size << LL_ENDL;  				if(size > 0) +				{ +					mInputMutex.lock();  					mInput.append(input_buf, size); +					mInputMutex.unlock(); +				}  				if(status == APR_SUCCESS)  				{ -//					llinfos << "success, read " << size << llendl; +					LL_DEBUGS("PluginSocket") << "success, read " << size << LL_ENDL;  					if(size != request_size)  					{ @@ -250,16 +305,28 @@ bool LLPluginMessagePipe::pump(F64 timeout)  				}  				else if(APR_STATUS_IS_TIMEUP(status))  				{ -//					llinfos << "TIMEUP, read " << size << llendl; +					LL_DEBUGS("PluginSocket") << "TIMEUP, read " << size << LL_ENDL;  					// Timeout was hit.  Since the initial read is 1 byte, this should never be a partial read.  					break;  				}  				else if(APR_STATUS_IS_EAGAIN(status))  				{ -//					llinfos << "EAGAIN, read " << size << llendl; +					LL_DEBUGS("PluginSocket") << "EAGAIN, read " << size << LL_ENDL; -					// We've been doing partial reads, and we're done now. +					// Non-blocking read returned immediately. +					break; +				} +				else if(APR_STATUS_IS_EOF(status)) +				{ +					// This is what we normally expect when a plugin exits. +					LL_INFOS("PluginSocket") << "Got EOF from plugin socket. " << LL_ENDL; + +					if(mOwner) +					{ +						mOwner->socketError(status); +					} +					result = false;  					break;  				}  				else @@ -276,22 +343,18 @@ bool LLPluginMessagePipe::pump(F64 timeout)  					break;  				} -				// Second and subsequent reads should not use the timeout -				setSocketTimeout(0); -				// and should try to fill the input buffer -				request_size = sizeof(input_buf); +				if(timeout != 0.0f) +				{ +					// Second and subsequent reads should not use the timeout +					setSocketTimeout(0); +					// and should try to fill the input buffer +					request_size = sizeof(input_buf); +				}  			}  			processInput();  		}  	} - -	if(!result) -	{ -		// If we got an error, we're done. -		LL_INFOS("Plugin") << "Error from socket, cleaning up." << LL_ENDL; -		delete this; -	}  	return result;	  } @@ -300,6 +363,7 @@ void LLPluginMessagePipe::processInput(void)  {  	// Look for input delimiter(s) in the input buffer.  	int delim; +	mInputMutex.lock();  	while((delim = mInput.find(MESSAGE_DELIMITER)) != std::string::npos)  	{	  		// Let the owner process this message @@ -310,12 +374,15 @@ void LLPluginMessagePipe::processInput(void)  			// and this guarantees that the messages will get dequeued correctly.  			std::string message(mInput, 0, delim);  			mInput.erase(0, delim + 1); +			mInputMutex.unlock();  			mOwner->receiveMessageRaw(message); +			mInputMutex.lock();  		}  		else  		{  			LL_WARNS("Plugin") << "!mOwner" << LL_ENDL;  		}  	} +	mInputMutex.unlock();  } diff --git a/indra/llplugin/llpluginmessagepipe.h b/indra/llplugin/llpluginmessagepipe.h index 1ddb38de68..1b0a08254b 100644 --- a/indra/llplugin/llpluginmessagepipe.h +++ b/indra/llplugin/llpluginmessagepipe.h @@ -35,6 +35,7 @@  #define LL_LLPLUGINMESSAGEPIPE_H  #include "lliosocket.h" +#include "llthread.h"  class LLPluginMessagePipe; @@ -51,7 +52,7 @@ public:  	virtual apr_status_t socketError(apr_status_t error);  	// called from LLPluginMessagePipe to manage the connection with LLPluginMessagePipeOwner -- do not use! -	virtual void setMessagePipe(LLPluginMessagePipe *message_pipe) ; +	virtual void setMessagePipe(LLPluginMessagePipe *message_pipe);  protected:  	// returns false if writeMessageRaw() would drop the message @@ -76,14 +77,18 @@ public:  	void clearOwner(void);  	bool pump(F64 timeout = 0.0f); -	 +	bool pumpOutput(); +	bool pumpInput(F64 timeout = 0.0f); +		  protected:	  	void processInput(void);  	// used internally by pump()  	void setSocketTimeout(apr_interval_time_t timeout_usec); +	LLMutex mInputMutex;  	std::string mInput; +	LLMutex mOutputMutex;  	std::string mOutput;  	LLPluginMessagePipeOwner *mOwner; diff --git a/indra/llplugin/llpluginprocesschild.cpp b/indra/llplugin/llpluginprocesschild.cpp index 2d078cd6ed..d1cf91b253 100644 --- a/indra/llplugin/llpluginprocesschild.cpp +++ b/indra/llplugin/llpluginprocesschild.cpp @@ -85,9 +85,14 @@ void LLPluginProcessChild::idle(void)  	bool idle_again;  	do  	{ -		if(mSocketError != APR_SUCCESS) +		if(APR_STATUS_IS_EOF(mSocketError))  		{ -			LL_INFOS("Plugin") << "message pipe is in error state, moving to STATE_ERROR"<< LL_ENDL; +			// Plugin socket was closed.  This covers both normal plugin termination and host crashes. +			setState(STATE_ERROR); +		} +		else if(mSocketError != APR_SUCCESS) +		{ +			LL_INFOS("Plugin") << "message pipe is in error state (" << mSocketError << "), moving to STATE_ERROR"<< LL_ENDL;  			setState(STATE_ERROR);  		}	 diff --git a/indra/llplugin/llpluginprocessparent.cpp b/indra/llplugin/llpluginprocessparent.cpp index e273410a1d..c61a903a2c 100644 --- a/indra/llplugin/llpluginprocessparent.cpp +++ b/indra/llplugin/llpluginprocessparent.cpp @@ -45,8 +45,51 @@ LLPluginProcessParentOwner::~LLPluginProcessParentOwner()  } -LLPluginProcessParent::LLPluginProcessParent(LLPluginProcessParentOwner *owner) +bool LLPluginProcessParent::sUseReadThread = false; +apr_pollset_t *LLPluginProcessParent::sPollSet = NULL; +bool LLPluginProcessParent::sPollsetNeedsRebuild = false; +LLMutex *LLPluginProcessParent::sInstancesMutex; +std::list<LLPluginProcessParent*> LLPluginProcessParent::sInstances; +LLThread *LLPluginProcessParent::sPollThread = NULL; + + +class LLPluginProcessParentPollThread: public LLThread +{ +public: +	LLPluginProcessParentPollThread() : +		LLThread("LLPluginProcessParentPollThread", gAPRPoolp) +	{ +	} +protected: +	// Inherited from LLThread +	/*virtual*/ void run(void) +	{ +		while(!isQuitting() && LLPluginProcessParent::getUseReadThread()) +		{ +			LLPluginProcessParent::poll(0.1f); +			checkPause(); +		} +		 +		// Final poll to clean up the pollset, etc. +		LLPluginProcessParent::poll(0.0f); +	}  + +	// Inherited from LLThread +	/*virtual*/ bool runCondition(void) +	{ +		return(LLPluginProcessParent::canPollThreadRun()); +	} + +}; + +LLPluginProcessParent::LLPluginProcessParent(LLPluginProcessParentOwner *owner): +	mIncomingQueueMutex(gAPRPoolp)  { +	if(!sInstancesMutex) +	{ +		sInstancesMutex = new LLMutex(gAPRPoolp); +	} +	  	mOwner = owner;  	mBoundPort = 0;  	mState = STATE_UNINITIALIZED; @@ -55,18 +98,30 @@ LLPluginProcessParent::LLPluginProcessParent(LLPluginProcessParentOwner *owner)  	mDisableTimeout = false;  	mDebug = false;  	mBlocked = false; +	mPolledInput = false; +	mPollFD.client_data = NULL;  	mPluginLaunchTimeout = 60.0f;  	mPluginLockupTimeout = 15.0f;  	// Don't start the timer here -- start it when we actually launch the plugin process.  	mHeartbeat.stop(); +	 +	// Don't add to the global list until fully constructed. +	sInstancesMutex->lock(); +	sInstances.push_back(this); +	sInstancesMutex->unlock();  }  LLPluginProcessParent::~LLPluginProcessParent()  {  	LL_DEBUGS("Plugin") << "destructor" << LL_ENDL; +	// Remove from the global list before beginning destruction. +	sInstancesMutex->lock(); +	sInstances.remove(this); +	sInstancesMutex->unlock(); +  	// Destroy any remaining shared memory regions  	sharedMemoryRegionsType::iterator iter;  	while((iter = mSharedMemoryRegions.begin()) != mSharedMemoryRegions.end()) @@ -78,15 +133,15 @@ LLPluginProcessParent::~LLPluginProcessParent()  		mSharedMemoryRegions.erase(iter);  	} -	// orphaning the process means it won't be killed when the LLProcessLauncher is destructed. -	// This is what we want -- it should exit cleanly once it notices the sockets have been closed. -	mProcess.orphan(); +	mProcess.kill();  	killSockets();  }  void LLPluginProcessParent::killSockets(void)  { +	mIncomingQueueMutex.lock();  	killMessagePipe(); +	mIncomingQueueMutex.unlock();  	mListenSocket.reset();  	mSocket.reset();  } @@ -160,21 +215,47 @@ void LLPluginProcessParent::idle(void)  	do  	{ +		// process queued messages +		mIncomingQueueMutex.lock(); +		while(!mIncomingQueue.empty()) +		{ +			LLPluginMessage message = mIncomingQueue.front(); +			mIncomingQueue.pop(); +			mIncomingQueueMutex.unlock(); +				 +			receiveMessage(message); +			 +			mIncomingQueueMutex.lock(); +		} + +		mIncomingQueueMutex.unlock(); +		  		// Give time to network processing  		if(mMessagePipe)  		{ -			if(!mMessagePipe->pump()) +			// Drain any queued outgoing messages +			mMessagePipe->pumpOutput(); +			 +			// Only do input processing here if this instance isn't in a pollset. +			if(!mPolledInput)  			{ -//				LL_WARNS("Plugin") << "Message pipe hit an error state" << LL_ENDL; -				errorState(); +				mMessagePipe->pumpInput();  			}  		} - -		if((mSocketError != APR_SUCCESS) && (mState <= STATE_RUNNING)) +		 +		if(mState <= STATE_RUNNING)  		{ -			// The socket is in an error state -- the plugin is gone. -			LL_WARNS("Plugin") << "Socket hit an error state (" << mSocketError << ")" << LL_ENDL; -			errorState(); +			if(APR_STATUS_IS_EOF(mSocketError)) +			{ +				// Plugin socket was closed.  This covers both normal plugin termination and plugin crashes. +				errorState(); +			} +			else if(mSocketError != APR_SUCCESS) +			{ +				// The socket is in an error state -- the plugin is gone. +				LL_WARNS("Plugin") << "Socket hit an error state (" << mSocketError << ")" << LL_ENDL; +				errorState(); +			}  		}	  		// If a state needs to go directly to another state (as a performance enhancement), it can set idle_again to true after calling setState(). @@ -355,7 +436,7 @@ void LLPluginProcessParent::idle(void)  			break;  			case STATE_HELLO: -				LL_DEBUGS("Plugin") << "received hello message" << llendl; +				LL_DEBUGS("Plugin") << "received hello message" << LL_ENDL;  				// Send the message to load the plugin  				{ @@ -389,7 +470,7 @@ void LLPluginProcessParent::idle(void)  				}  				else if(pluginLockedUp())  				{ -					LL_WARNS("Plugin") << "timeout in exiting state, bailing out" << llendl; +					LL_WARNS("Plugin") << "timeout in exiting state, bailing out" << LL_ENDL;  					errorState();  				}  			break; @@ -411,8 +492,7 @@ void LLPluginProcessParent::idle(void)  			break;  			case STATE_CLEANUP: -				// Don't do a kill here anymore -- closing the sockets is the new 'kill'. -				mProcess.orphan(); +				mProcess.kill();  				killSockets();  				setState(STATE_DONE);  			break; @@ -491,29 +571,315 @@ void LLPluginProcessParent::sendMessage(const LLPluginMessage &message)  	std::string buffer = message.generate();  	LL_DEBUGS("Plugin") << "Sending: " << buffer << LL_ENDL;	  	writeMessageRaw(buffer); +	 +	// Try to send message immediately. +	if(mMessagePipe) +	{ +		mMessagePipe->pumpOutput(); +	} +} + +//virtual  +void LLPluginProcessParent::setMessagePipe(LLPluginMessagePipe *message_pipe) +{ +	bool update_pollset = false; +	 +	if(mMessagePipe) +	{ +		// Unsetting an existing message pipe -- remove from the pollset		 +		mPollFD.client_data = NULL; + +		// pollset needs an update +		update_pollset = true; +	} +	if(message_pipe != NULL) +	{ +		// Set up the apr_pollfd_t +		mPollFD.p = gAPRPoolp; +		mPollFD.desc_type = APR_POLL_SOCKET; +		mPollFD.reqevents = APR_POLLIN|APR_POLLERR|APR_POLLHUP; +		mPollFD.rtnevents = 0; +		mPollFD.desc.s = mSocket->getSocket(); +		mPollFD.client_data = (void*)this;	 +		 +		// pollset needs an update +		update_pollset = true; +	} + +	mMessagePipe = message_pipe; +	 +	if(update_pollset) +	{ +		dirtyPollSet(); +	}  } +//static  +void LLPluginProcessParent::dirtyPollSet() +{ +	sPollsetNeedsRebuild = true; +	 +	if(sPollThread) +	{ +		LL_DEBUGS("PluginPoll") << "unpausing polling thread " << LL_ENDL; +		sPollThread->unpause(); +	} +} + +void LLPluginProcessParent::updatePollset() +{ +	if(!sInstancesMutex) +	{ +		// No instances have been created yet.  There's no work to do. +		return; +	} +		 +	sInstancesMutex->lock(); + +	if(sPollSet) +	{ +		LL_DEBUGS("PluginPoll") << "destroying pollset " << sPollSet << LL_ENDL; +		// delete the existing pollset. +		apr_pollset_destroy(sPollSet); +		sPollSet = NULL; +	} +	 +	std::list<LLPluginProcessParent*>::iterator iter; +	int count = 0; +	 +	// Count the number of instances that want to be in the pollset +	for(iter = sInstances.begin(); iter != sInstances.end(); iter++) +	{ +		(*iter)->mPolledInput = false; +		if((*iter)->mPollFD.client_data) +		{ +			// This instance has a socket that needs to be polled. +			++count; +		} +	} + +	if(sUseReadThread && sPollThread && !sPollThread->isQuitting()) +	{ +		if(!sPollSet && (count > 0)) +		{ +			// The pollset doesn't exist yet.  Create it now. +			apr_status_t status = apr_pollset_create(&sPollSet, count, gAPRPoolp, APR_POLLSET_NOCOPY); +			if(status != APR_SUCCESS) +			{ +				LL_WARNS("PluginPoll") << "Couldn't create pollset.  Falling back to non-pollset mode." << LL_ENDL; +				sPollSet = NULL; +			} +			else +			{ +				LL_DEBUGS("PluginPoll") << "created pollset " << sPollSet << LL_ENDL; +				 +				// Pollset was created, add all instances to it. +				for(iter = sInstances.begin(); iter != sInstances.end(); iter++) +				{ +					if((*iter)->mPollFD.client_data) +					{ +						status = apr_pollset_add(sPollSet, &((*iter)->mPollFD)); +						if(status == APR_SUCCESS) +						{ +							(*iter)->mPolledInput = true; +						} +						else +						{ +							LL_WARNS("PluginPoll") << "apr_pollset_add failed with status " << status << LL_ENDL; +						} +					} +				} +			} +		} +	} +	sInstancesMutex->unlock(); +} + +void LLPluginProcessParent::setUseReadThread(bool use_read_thread) +{ +	if(sUseReadThread != use_read_thread) +	{ +		sUseReadThread = use_read_thread; +		 +		if(sUseReadThread) +		{ +			if(!sPollThread) +			{ +				// start up the read thread +				LL_INFOS("PluginPoll") << "creating polling thread " << LL_ENDL; + +				// make sure the pollset gets rebuilt. +				sPollsetNeedsRebuild = true; +				 +				sPollThread = new LLPluginProcessParentPollThread; +				sPollThread->start(); +			} +		} +		else +		{ +			if(sPollThread) +			{ +				// shut down the read thread +				LL_INFOS("PluginPoll") << "destroying polling thread " << LL_ENDL; +				delete sPollThread; +				sPollThread = NULL; +			} +		} + +	} +} + +void LLPluginProcessParent::poll(F64 timeout) +{ +	if(sPollsetNeedsRebuild || !sUseReadThread) +	{ +		sPollsetNeedsRebuild = false; +		updatePollset(); +	} +	 +	if(sPollSet) +	{ +		apr_status_t status; +		apr_int32_t count; +		const apr_pollfd_t *descriptors; +		status = apr_pollset_poll(sPollSet, (apr_interval_time_t)(timeout * 1000000), &count, &descriptors); +		if(status == APR_SUCCESS) +		{ +			// One or more of the descriptors signalled.  Call them. +			for(int i = 0; i < count; i++) +			{ +				LLPluginProcessParent *self = (LLPluginProcessParent *)(descriptors[i].client_data); +				// NOTE: the descriptor returned here is actually a COPY of the original (even though we create the pollset with APR_POLLSET_NOCOPY). +				// This means that even if the parent has set its mPollFD.client_data to NULL, the old pointer may still there in this descriptor. +				// It's even possible that the old pointer no longer points to a valid LLPluginProcessParent. +				// This means that we can't safely dereference the 'self' pointer here without some extra steps... +				if(self) +				{ +					// Make sure this pointer is still in the instances list +					sInstancesMutex->lock(); +					bool valid = false; +					for(std::list<LLPluginProcessParent*>::iterator iter = sInstances.begin(); iter != sInstances.end(); ++iter) +					{ +						if(*iter == self) +						{ +							// Lock the instance's mutex before unlocking the global mutex.   +							// This avoids a possible race condition where the instance gets deleted between this check and the servicePoll() call. +							self->mIncomingQueueMutex.lock(); +							valid = true; +							break; +						} +					} +					sInstancesMutex->unlock(); + +					if(valid) +					{ +						// The instance is still valid. +						// Pull incoming messages off the socket +						self->servicePoll(); +						self->mIncomingQueueMutex.unlock(); +					} +					else +					{ +						LL_DEBUGS("PluginPoll") << "detected deleted instance " << self << LL_ENDL; +					} + +				} +			} +		} +		else if(APR_STATUS_IS_TIMEUP(status)) +		{ +			// timed out with no incoming data.  Just return. +		} +		else if(status == EBADF) +		{ +			// This happens when one of the file descriptors in the pollset is destroyed, which happens whenever a plugin's socket is closed. +			// The pollset has been or will be recreated, so just return. +			LL_DEBUGS("PluginPoll") << "apr_pollset_poll returned EBADF" << LL_ENDL; +		} +		else if(status != APR_SUCCESS) +		{ +			LL_WARNS("PluginPoll") << "apr_pollset_poll failed with status " << status << LL_ENDL; +		} +	} +} + +void LLPluginProcessParent::servicePoll() +{ +	bool result = true; +	 +	// poll signalled on this object's socket.  Try to process incoming messages. +	if(mMessagePipe) +	{ +		result = mMessagePipe->pumpInput(0.0f); +	} + +	if(!result) +	{ +		// If we got a read error on input, remove this pipe from the pollset +		apr_pollset_remove(sPollSet, &mPollFD); + +		// and tell the code not to re-add it +		mPollFD.client_data = NULL; +	} +}  void LLPluginProcessParent::receiveMessageRaw(const std::string &message)  {  	LL_DEBUGS("Plugin") << "Received: " << message << LL_ENDL; - -	// FIXME: should this go into a queue instead?  	LLPluginMessage parsed;  	if(parsed.parse(message) != -1)  	{ -		receiveMessage(parsed); +		if(parsed.hasValue("blocking_request")) +		{ +			mBlocked = true; +		} + +		if(mPolledInput) +		{ +			// This is being called on the polling thread -- only do minimal processing/queueing. +			receiveMessageEarly(parsed); +		} +		else +		{ +			// This is not being called on the polling thread -- do full message processing at this time. +			receiveMessage(parsed); +		}  	}  } -void LLPluginProcessParent::receiveMessage(const LLPluginMessage &message) +void LLPluginProcessParent::receiveMessageEarly(const LLPluginMessage &message)  { -	if(message.hasValue("blocking_request")) +	// NOTE: this function will be called from the polling thread.  It will be called with mIncomingQueueMutex _already locked_.  + +	bool handled = false; +	 +	std::string message_class = message.getClass(); +	if(message_class == LLPLUGIN_MESSAGE_CLASS_INTERNAL)  	{ -		mBlocked = true; +		// no internal messages need to be handled early.  	} +	else +	{ +		// Call out to the owner and see if they to reply +		// TODO: Should this only happen when blocked? +		if(mOwner != NULL) +		{ +			handled = mOwner->receivePluginMessageEarly(message); +		} +	} +	 +	if(!handled) +	{ +		// any message that wasn't handled early needs to be queued. +//		mIncomingQueueMutex.lock(); +		mIncomingQueue.push(message); +//		mIncomingQueueMutex.unlock(); +	} +} +void LLPluginProcessParent::receiveMessage(const LLPluginMessage &message) +{  	std::string message_class = message.getClass();  	if(message_class == LLPLUGIN_MESSAGE_CLASS_INTERNAL)  	{ @@ -704,12 +1070,12 @@ bool LLPluginProcessParent::pluginLockedUpOrQuit()  	if(!mProcess.isRunning())  	{ -		LL_WARNS("Plugin") << "child exited" << llendl; +		LL_WARNS("Plugin") << "child exited" << LL_ENDL;  		result = true;  	}  	else if(pluginLockedUp())  	{ -		LL_WARNS("Plugin") << "timeout" << llendl; +		LL_WARNS("Plugin") << "timeout" << LL_ENDL;  		result = true;  	} diff --git a/indra/llplugin/llpluginprocessparent.h b/indra/llplugin/llpluginprocessparent.h index 31f125bfb3..1ad0fbf059 100644 --- a/indra/llplugin/llpluginprocessparent.h +++ b/indra/llplugin/llpluginprocessparent.h @@ -41,12 +41,14 @@  #include "llpluginsharedmemory.h"  #include "lliosocket.h" +#include "llthread.h"  class LLPluginProcessParentOwner  {  public:  	virtual ~LLPluginProcessParentOwner();  	virtual void receivePluginMessage(const LLPluginMessage &message) = 0; +	virtual bool receivePluginMessageEarly(const LLPluginMessage &message) {return false;};  	// This will only be called when the plugin has died unexpectedly   	virtual void pluginLaunchFailed() {};  	virtual void pluginDied() {}; @@ -90,7 +92,9 @@ public:  	void receiveMessage(const LLPluginMessage &message);  	// Inherited from LLPluginMessagePipeOwner -	void receiveMessageRaw(const std::string &message); +	/*virtual*/ void receiveMessageRaw(const std::string &message); +	/*virtual*/ void receiveMessageEarly(const LLPluginMessage &message); +	/*virtual*/ void setMessagePipe(LLPluginMessagePipe *message_pipe) ;  	// This adds a memory segment shared with the client, generating a name for the segment.  The name generated is guaranteed to be unique on the host.  	// The caller must call removeSharedMemory first (and wait until getSharedMemorySize returns 0 for the indicated name) before re-adding a segment with the same name. @@ -113,7 +117,11 @@ public:  	void setLockupTimeout(F32 timeout) { mPluginLockupTimeout = timeout; };  	F64 getCPUUsage() { return mCPUUsage; }; - +	 +	static void poll(F64 timeout); +	static bool canPollThreadRun() { return (sPollSet || sPollsetNeedsRebuild || sUseReadThread); }; +	static void setUseReadThread(bool use_read_thread); +	static bool getUseReadThread() { return sUseReadThread; };  private:  	enum EState @@ -164,12 +172,26 @@ private:  	bool mDisableTimeout;  	bool mDebug;  	bool mBlocked; +	bool mPolledInput;  	LLProcessLauncher mDebugger;  	F32 mPluginLaunchTimeout;		// Somewhat longer timeout for initial launch.  	F32 mPluginLockupTimeout;		// If we don't receive a heartbeat in this many seconds, we declare the plugin locked up. +	static bool sUseReadThread; +	apr_pollfd_t mPollFD; +	static apr_pollset_t *sPollSet; +	static bool sPollsetNeedsRebuild; +	static LLMutex *sInstancesMutex; +	static std::list<LLPluginProcessParent*> sInstances; +	static void dirtyPollSet(); +	static void updatePollset(); +	void servicePoll(); +	static LLThread *sPollThread; +	 +	LLMutex mIncomingQueueMutex; +	std::queue<LLPluginMessage> mIncomingQueue;  };  #endif // LL_LLPLUGINPROCESSPARENT_H diff --git a/indra/newview/app_settings/settings.xml b/indra/newview/app_settings/settings.xml index 6f11a6d616..280c3d642c 100644 --- a/indra/newview/app_settings/settings.xml +++ b/indra/newview/app_settings/settings.xml @@ -5585,6 +5585,19 @@        <key>Value</key>        <integer>8</integer>      </map> + +   <key>PluginUseReadThread</key> +    <map> +      <key>Comment</key> +      <string>Use a separate thread to read incoming messages from plugins</string> +      <key>Persist</key> +      <integer>1</integer> +      <key>Type</key> +      <string>Boolean</string> +      <key>Value</key> +      <integer>0</integer> +    </map> +      <key>PrecachingDelay</key>      <map>        <key>Comment</key> diff --git a/indra/newview/llappviewer.cpp b/indra/newview/llappviewer.cpp index 2f9bbb1407..4f7c0c3549 100644 --- a/indra/newview/llappviewer.cpp +++ b/indra/newview/llappviewer.cpp @@ -1530,6 +1530,9 @@ bool LLAppViewer::cleanup()  	LLViewerMedia::saveCookieFile(); +	// Stop the plugin read thread if it's running. +	LLPluginProcessParent::setUseReadThread(false); +  	llinfos << "Shutting down Threads" << llendflush;  	// Let threads finish diff --git a/indra/newview/llviewermedia.cpp b/indra/newview/llviewermedia.cpp index fd2bb0fdf9..8e210554fb 100644 --- a/indra/newview/llviewermedia.cpp +++ b/indra/newview/llviewermedia.cpp @@ -732,10 +732,21 @@ static bool proximity_comparitor(const LLViewerMediaImpl* i1, const LLViewerMedi  	}  } +static LLFastTimer::DeclareTimer FTM_MEDIA_UPDATE("Update Media"); +  //////////////////////////////////////////////////////////////////////////////////////////  // static  void LLViewerMedia::updateMedia(void *dummy_arg)  { +	LLFastTimer t1(FTM_MEDIA_UPDATE); +	 +	bool use_read_thread = gSavedSettings.getBOOL("PluginUseReadThread"); +	if(LLPluginProcessParent::getUseReadThread() != use_read_thread) +	{ +		// Enable/disable the plugin read thread +		LLPluginProcessParent::setUseReadThread(use_read_thread); +	} +	  	sAnyMediaShowing = false;  	sUpdatedCookies = getCookieStore()->getChangedCookies();  	if(!sUpdatedCookies.empty()) diff --git a/indra/newview/skins/default/xui/en/menu_viewer.xml b/indra/newview/skins/default/xui/en/menu_viewer.xml index 3af80f63fe..df4f33adf0 100644 --- a/indra/newview/skins/default/xui/en/menu_viewer.xml +++ b/indra/newview/skins/default/xui/en/menu_viewer.xml @@ -1339,6 +1339,16 @@               function="ToggleControl"               parameter="RunMultipleThreads" />          </menu_item_check> +        <menu_item_check +         label="Use Plugin Read Thread" +         name="Use Plugin Read Thread"> +            <menu_item_check.on_check +             function="CheckControl" +             parameter="PluginUseReadThread" /> +            <menu_item_check.on_click +             function="ToggleControl" +             parameter="PluginUseReadThread" /> +        </menu_item_check>          <menu_item_call           label="Clear Group Cache"           name="ClearGroupCache"> diff --git a/indra/test_apps/llplugintest/llmediaplugintest.cpp b/indra/test_apps/llplugintest/llmediaplugintest.cpp index 7e9a8336e7..7a544debb2 100644 --- a/indra/test_apps/llplugintest/llmediaplugintest.cpp +++ b/indra/test_apps/llplugintest/llmediaplugintest.cpp @@ -241,6 +241,9 @@ LLMediaPluginTest::~LLMediaPluginTest()  	{  		remMediaPanel( mMediaPanels[ i ] );  	}; +	 +	// Stop the plugin read thread if it's running. +	LLPluginProcessParent::setUseReadThread(false);  }  //////////////////////////////////////////////////////////////////////////////// @@ -1047,6 +1050,11 @@ void LLMediaPluginTest::gluiCallback( int control_id )  		}  	}  	else +	if ( control_id == mIdUsePluginReadThread ) +	{ +		LLPluginProcessParent::setUseReadThread(mUsePluginReadThread); +	} +	else  	if ( control_id == mIdControlCrashPlugin )  	{  		// send message to plugin and ask it to crash @@ -1431,6 +1439,12 @@ void LLMediaPluginTest::makeChrome()  	glui_window_misc_control->set_main_gfx_window( mAppWindow );  	glui_window_misc_control->add_column( true ); +	mIdUsePluginReadThread = start_id++; +	mUsePluginReadThread = 0; +	glui_window_misc_control->add_checkbox( "Use plugin read thread", &mUsePluginReadThread, mIdUsePluginReadThread, gluiCallbackWrapper ); +	glui_window_misc_control->set_main_gfx_window( mAppWindow ); +	glui_window_misc_control->add_column( true ); +  	mIdLargePanelSpacing = start_id++;  	mLargePanelSpacing = 0;  	glui_window_misc_control->add_checkbox( "Large Panel Spacing", &mLargePanelSpacing, mIdLargePanelSpacing, gluiCallbackWrapper ); diff --git a/indra/test_apps/llplugintest/llmediaplugintest.h b/indra/test_apps/llplugintest/llmediaplugintest.h index e7c7699343..5d08e42148 100644 --- a/indra/test_apps/llplugintest/llmediaplugintest.h +++ b/indra/test_apps/llplugintest/llmediaplugintest.h @@ -164,6 +164,8 @@ class LLMediaPluginTest : public LLPluginClassMediaOwner  		int mRandomBookmarks;  		int mIdDisableTimeout;  		int mDisableTimeout; +		int mIdUsePluginReadThread; +		int mUsePluginReadThread;  		int mIdLargePanelSpacing;  		int mLargePanelSpacing;  		int mIdControlCrashPlugin;  | 
