diff options
| -rw-r--r-- | indra/llcommon/llthread.cpp | 15 | ||||
| -rw-r--r-- | indra/llcommon/llthread.h | 1 | ||||
| -rw-r--r-- | indra/llmessage/llbuffer.cpp | 90 | ||||
| -rw-r--r-- | indra/llmessage/llbuffer.h | 25 | ||||
| -rw-r--r-- | indra/llmessage/llbufferstream.cpp | 8 | ||||
| -rw-r--r-- | indra/llmessage/llcurl.cpp | 3 | ||||
| -rw-r--r-- | indra/llmessage/lliohttpserver.cpp | 4 | ||||
| -rw-r--r-- | indra/llmessage/lliosocket.cpp | 3 | ||||
| -rw-r--r-- | indra/llmessage/llpumpio.cpp | 1 | 
9 files changed, 142 insertions, 8 deletions
| diff --git a/indra/llcommon/llthread.cpp b/indra/llcommon/llthread.cpp index 4063cc730b..a6ad6b125c 100644 --- a/indra/llcommon/llthread.cpp +++ b/indra/llcommon/llthread.cpp @@ -337,11 +337,7 @@ LLMutex::~LLMutex()  void LLMutex::lock()  { -#if LL_DARWIN -	if (mLockingThread == LLThread::currentID()) -#else -	if (mLockingThread == sThreadID) -#endif +	if(isSelfLocked())  	{ //redundant lock  		mCount++;  		return; @@ -398,6 +394,15 @@ bool LLMutex::isLocked()  	}  } +bool LLMutex::isSelfLocked() +{ +#if LL_DARWIN +	return mLockingThread == LLThread::currentID(); +#else +	return mLockingThread == sThreadID; +#endif +} +  U32 LLMutex::lockingThread() const  {  	return mLockingThread; diff --git a/indra/llcommon/llthread.h b/indra/llcommon/llthread.h index f0e0de6173..b52e70ab2e 100644 --- a/indra/llcommon/llthread.h +++ b/indra/llcommon/llthread.h @@ -151,6 +151,7 @@ public:  	void lock();		// blocks  	void unlock();  	bool isLocked(); 	// non-blocking, but does do a lock/unlock so not free +	bool isSelfLocked(); //return true if locked in a same thread  	U32 lockingThread() const; //get ID of locking thread  protected: diff --git a/indra/llmessage/llbuffer.cpp b/indra/llmessage/llbuffer.cpp index 0316797f00..250cace6e9 100644 --- a/indra/llmessage/llbuffer.cpp +++ b/indra/llmessage/llbuffer.cpp @@ -32,6 +32,9 @@  #include "llmath.h"  #include "llmemtype.h"  #include "llstl.h" +#include "llthread.h" + +#define ASSERT_LLBUFFERARRAY_MUTEX_LOCKED llassert(!mMutexp || mMutexp->isSelfLocked());  /**    * LLSegment @@ -224,7 +227,8 @@ void LLHeapBuffer::allocate(S32 size)   * LLBufferArray   */  LLBufferArray::LLBufferArray() : -	mNextBaseChannel(0) +	mNextBaseChannel(0), +	mMutexp(NULL)  {  	LLMemType m1(LLMemType::MTYPE_IO_BUFFER);  } @@ -233,6 +237,8 @@ LLBufferArray::~LLBufferArray()  {  	LLMemType m1(LLMemType::MTYPE_IO_BUFFER);  	std::for_each(mBuffers.begin(), mBuffers.end(), DeletePointer()); + +	delete mMutexp;  }  // static @@ -243,14 +249,57 @@ LLChannelDescriptors LLBufferArray::makeChannelConsumer(  	return rv;  } +void LLBufferArray::lock() +{ +	if(mMutexp) +	{ +		mMutexp->lock() ; +	} +} + +void LLBufferArray::unlock() +{ +	if(mMutexp) +	{ +		mMutexp->unlock() ; +	} +} + +LLMutex* LLBufferArray::getMutex() +{ +	return mMutexp ; +} + +void LLBufferArray::setThreaded(bool threaded) +{ +	if(threaded) +	{ +		if(!mMutexp) +		{ +			mMutexp = new LLMutex(NULL); +		} +	} +	else +	{ +		if(mMutexp) +		{ +			delete mMutexp ; +			mMutexp = NULL ; +		} +	} +} +  LLChannelDescriptors LLBufferArray::nextChannel()  {  	LLChannelDescriptors rv(mNextBaseChannel++);  	return rv;  } +//mMutexp should be locked before calling this.  S32 LLBufferArray::capacity() const  { +	ASSERT_LLBUFFERARRAY_MUTEX_LOCKED +  	S32 total = 0;  	const_buffer_iterator_t iter = mBuffers.begin();  	const_buffer_iterator_t end = mBuffers.end(); @@ -263,6 +312,8 @@ S32 LLBufferArray::capacity() const  bool LLBufferArray::append(S32 channel, const U8* src, S32 len)  { +	LLMutexLock lock(mMutexp) ; +  	LLMemType m1(LLMemType::MTYPE_IO_BUFFER);  	std::vector<LLSegment> segments;  	if(copyIntoBuffers(channel, src, len, segments)) @@ -273,8 +324,11 @@ bool LLBufferArray::append(S32 channel, const U8* src, S32 len)  	return false;  } +//mMutexp should be locked before calling this.  bool LLBufferArray::prepend(S32 channel, const U8* src, S32 len)  { +	ASSERT_LLBUFFERARRAY_MUTEX_LOCKED +  	LLMemType m1(LLMemType::MTYPE_IO_BUFFER);  	std::vector<LLSegment> segments;  	if(copyIntoBuffers(channel, src, len, segments)) @@ -293,6 +347,8 @@ bool LLBufferArray::insertAfter(  {  	LLMemType m1(LLMemType::MTYPE_IO_BUFFER);  	std::vector<LLSegment> segments; + +	LLMutexLock lock(mMutexp) ;  	if(mSegments.end() != segment)  	{  		++segment; @@ -305,8 +361,11 @@ bool LLBufferArray::insertAfter(  	return false;  } +//mMutexp should be locked before calling this.  LLBufferArray::segment_iterator_t LLBufferArray::splitAfter(U8* address)  { +	ASSERT_LLBUFFERARRAY_MUTEX_LOCKED +  	LLMemType m1(LLMemType::MTYPE_IO_BUFFER);  	segment_iterator_t end = mSegments.end();  	segment_iterator_t it = getSegment(address); @@ -335,20 +394,26 @@ LLBufferArray::segment_iterator_t LLBufferArray::splitAfter(U8* address)  	return rv;  } +//mMutexp should be locked before calling this.  LLBufferArray::segment_iterator_t LLBufferArray::beginSegment()  { +	ASSERT_LLBUFFERARRAY_MUTEX_LOCKED  	return mSegments.begin();  } +//mMutexp should be locked before calling this.  LLBufferArray::segment_iterator_t LLBufferArray::endSegment()  { +	ASSERT_LLBUFFERARRAY_MUTEX_LOCKED  	return mSegments.end();  } +//mMutexp should be locked before calling this.  LLBufferArray::segment_iterator_t LLBufferArray::constructSegmentAfter(  	U8* address,  	LLSegment& segment)  { +	ASSERT_LLBUFFERARRAY_MUTEX_LOCKED  	LLMemType m1(LLMemType::MTYPE_IO_BUFFER);  	segment_iterator_t rv = mSegments.begin();  	segment_iterator_t end = mSegments.end(); @@ -395,8 +460,10 @@ LLBufferArray::segment_iterator_t LLBufferArray::constructSegmentAfter(  	return rv;  } +//mMutexp should be locked before calling this.  LLBufferArray::segment_iterator_t LLBufferArray::getSegment(U8* address)  { +	ASSERT_LLBUFFERARRAY_MUTEX_LOCKED  	segment_iterator_t end = mSegments.end();  	if(!address)  	{ @@ -414,9 +481,11 @@ LLBufferArray::segment_iterator_t LLBufferArray::getSegment(U8* address)  	return end;  } +//mMutexp should be locked before calling this.  LLBufferArray::const_segment_iterator_t LLBufferArray::getSegment(  	U8* address) const  { +	ASSERT_LLBUFFERARRAY_MUTEX_LOCKED  	const_segment_iterator_t end = mSegments.end();  	if(!address)  	{ @@ -466,6 +535,8 @@ S32 LLBufferArray::countAfter(S32 channel, U8* start) const  	S32 count = 0;  	S32 offset = 0;  	const_segment_iterator_t it; + +	LLMutexLock lock(mMutexp) ;  	const_segment_iterator_t end = mSegments.end();  	if(start)  	{ @@ -517,6 +588,8 @@ U8* LLBufferArray::readAfter(  	len = 0;  	S32 bytes_to_copy = 0;  	const_segment_iterator_t it; + +	LLMutexLock lock(mMutexp) ;  	const_segment_iterator_t end = mSegments.end();  	if(start)  	{ @@ -568,6 +641,7 @@ U8* LLBufferArray::seek(  	U8* start,  	S32 delta) const  { +	ASSERT_LLBUFFERARRAY_MUTEX_LOCKED  	LLMemType m1(LLMemType::MTYPE_IO_BUFFER);  	const_segment_iterator_t it;  	const_segment_iterator_t end = mSegments.end(); @@ -709,9 +783,14 @@ U8* LLBufferArray::seek(  	return rv;  } +//test use only  bool LLBufferArray::takeContents(LLBufferArray& source)  {  	LLMemType m1(LLMemType::MTYPE_IO_BUFFER); + +	LLMutexLock lock(mMutexp); +	source.lock(); +  	std::copy(  		source.mBuffers.begin(),  		source.mBuffers.end(), @@ -723,13 +802,17 @@ bool LLBufferArray::takeContents(LLBufferArray& source)  		std::back_insert_iterator<segment_list_t>(mSegments));  	source.mSegments.clear();  	source.mNextBaseChannel = 0; +	source.unlock(); +  	return true;  } +//mMutexp should be locked before calling this.  LLBufferArray::segment_iterator_t LLBufferArray::makeSegment(  	S32 channel,  	S32 len)  { +	ASSERT_LLBUFFERARRAY_MUTEX_LOCKED  	LLMemType m1(LLMemType::MTYPE_IO_BUFFER);  	// start at the end of the buffers, because it is the most likely  	// to have free space. @@ -765,8 +848,10 @@ LLBufferArray::segment_iterator_t LLBufferArray::makeSegment(  	return send;  } +//mMutexp should be locked before calling this.  bool LLBufferArray::eraseSegment(const segment_iterator_t& erase_iter)  { +	ASSERT_LLBUFFERARRAY_MUTEX_LOCKED  	LLMemType m1(LLMemType::MTYPE_IO_BUFFER);  	// Find out which buffer contains the segment, and if it is found, @@ -792,13 +877,14 @@ bool LLBufferArray::eraseSegment(const segment_iterator_t& erase_iter)  	return rv;  } - +//mMutexp should be locked before calling this.  bool LLBufferArray::copyIntoBuffers(  	S32 channel,  	const U8* src,  	S32 len,  	std::vector<LLSegment>& segments)  { +	ASSERT_LLBUFFERARRAY_MUTEX_LOCKED  	LLMemType m1(LLMemType::MTYPE_IO_BUFFER);  	if(!src || !len) return false;  	S32 copied = 0; diff --git a/indra/llmessage/llbuffer.h b/indra/llmessage/llbuffer.h index 1c42b6fbc6..ccdb9fa7ee 100644 --- a/indra/llmessage/llbuffer.h +++ b/indra/llmessage/llbuffer.h @@ -39,6 +39,7 @@  #include <list>  #include <vector> +class LLMutex;  /**    * @class LLChannelDescriptors   * @brief A way simple interface to accesss channels inside a buffer @@ -564,6 +565,29 @@ public:  	 * @return Returns true on success.  	 */  	bool eraseSegment(const segment_iterator_t& iter); + +	/** +	* @brief Lock the mutex if it exists +	* This method locks mMutexp to make accessing LLBufferArray thread-safe +	*/ +	void lock(); + +	/** +	* @brief Unlock the mutex if it exists +	*/ +	void unlock(); + +	/** +	* @brief Return mMutexp +	*/ +	LLMutex* getMutex(); + +	/** +	* @brief Set LLBufferArray to be shared across threads or not +	* This method is to create mMutexp if is threaded. +	* @param threaded Indicates this LLBufferArray instance is shared across threads if true. +	*/ +	void setThreaded(bool threaded);  	//@}  protected: @@ -595,6 +619,7 @@ protected:  	S32 mNextBaseChannel;  	buffer_list_t mBuffers;  	segment_list_t mSegments; +	LLMutex* mMutexp;  };  #endif // LL_LLBUFFER_H diff --git a/indra/llmessage/llbufferstream.cpp b/indra/llmessage/llbufferstream.cpp index 6257983c43..8d8ad05ad5 100644 --- a/indra/llmessage/llbufferstream.cpp +++ b/indra/llmessage/llbufferstream.cpp @@ -31,6 +31,7 @@  #include "llbuffer.h"  #include "llmemtype.h" +#include "llthread.h"  static const S32 DEFAULT_OUTPUT_SEGMENT_SIZE = 1024 * 4; @@ -62,6 +63,7 @@ int LLBufferStreamBuf::underflow()  		return EOF;  	} +	LLMutexLock lock(mBuffer->getMutex());  	LLBufferArray::segment_iterator_t iter;  	LLBufferArray::segment_iterator_t end = mBuffer->endSegment();  	U8* last_pos = (U8*)gptr(); @@ -149,6 +151,7 @@ int LLBufferStreamBuf::overflow(int c)  	// since we got here, we have a buffer, and we have a character to  	// put on it.  	LLBufferArray::segment_iterator_t it; +	LLMutexLock lock(mBuffer->getMutex());  	it = mBuffer->makeSegment(mChannels.out(), DEFAULT_OUTPUT_SEGMENT_SIZE);  	if(it != mBuffer->endSegment())  	{ @@ -210,6 +213,7 @@ int LLBufferStreamBuf::sync()  	// *NOTE: I bet we could just --address if address is not NULL.  	// Need to think about that. +	LLMutexLock lock(mBuffer->getMutex());  	address = mBuffer->seek(mChannels.out(), address, -1);  	if(address)  	{ @@ -273,6 +277,8 @@ streampos LLBufferStreamBuf::seekoff(  			// NULL is fine  			break;  		} + +		LLMutexLock lock(mBuffer->getMutex());  		address = mBuffer->seek(mChannels.in(), base_addr, off);  		if(address)  		{ @@ -304,6 +310,8 @@ streampos LLBufferStreamBuf::seekoff(  			// NULL is fine  			break;  		} + +		LLMutexLock lock(mBuffer->getMutex());  		address = mBuffer->seek(mChannels.out(), base_addr, off);  		if(address)  		{ diff --git a/indra/llmessage/llcurl.cpp b/indra/llmessage/llcurl.cpp index 5edf0dc8c0..1ab82a273b 100644 --- a/indra/llmessage/llcurl.cpp +++ b/indra/llmessage/llcurl.cpp @@ -228,6 +228,8 @@ LLMutex* LLCurl::Easy::sHandleMutexp = NULL ;  //static  CURL* LLCurl::Easy::allocEasyHandle()  { +	llassert(LLCurl::getCurlThread()) ; +  	CURL* ret = NULL;  	LLMutexLock lock(sHandleMutexp) ; @@ -489,6 +491,7 @@ void LLCurl::Easy::prepRequest(const std::string& url,  	LLProxy::getInstance()->applyProxySettings(this);  	mOutput.reset(new LLBufferArray); +	mOutput->setThreaded(true);  	setopt(CURLOPT_WRITEFUNCTION, (void*)&curlWriteCallback);  	setopt(CURLOPT_WRITEDATA, (void*)this); diff --git a/indra/llmessage/lliohttpserver.cpp b/indra/llmessage/lliohttpserver.cpp index 73e8a69085..987f386aa3 100644 --- a/indra/llmessage/lliohttpserver.cpp +++ b/indra/llmessage/lliohttpserver.cpp @@ -818,6 +818,8 @@ LLIOPipe::EStatus LLHTTPResponder::process_impl(    			// Copy everything after mLast read to the out.  			LLBufferArray::segment_iterator_t seg_iter; + +			buffer->lock();  			seg_iter = buffer->splitAfter(mLastRead);  			if(seg_iter != buffer->endSegment())  			{ @@ -838,7 +840,7 @@ LLIOPipe::EStatus LLHTTPResponder::process_impl(  				}  #endif  			} - +			buffer->unlock();  			//  			// *FIX: get rid of extra bytes off the end  			// diff --git a/indra/llmessage/lliosocket.cpp b/indra/llmessage/lliosocket.cpp index 54ceab3422..d5b4d45821 100644 --- a/indra/llmessage/lliosocket.cpp +++ b/indra/llmessage/lliosocket.cpp @@ -445,6 +445,7 @@ LLIOPipe::EStatus LLIOSocketWriter::process_impl(  	// efficient - not only because writev() is better, but also  	// because we won't have to do as much work to find the start  	// address. +	buffer->lock();  	LLBufferArray::segment_iterator_t it;  	LLBufferArray::segment_iterator_t end = buffer->endSegment();  	LLSegment segment; @@ -524,6 +525,8 @@ LLIOPipe::EStatus LLIOSocketWriter::process_impl(  		}  	} +	buffer->unlock(); +  	PUMP_DEBUG;  	if(done && eos)  	{ diff --git a/indra/llmessage/llpumpio.cpp b/indra/llmessage/llpumpio.cpp index 0ff300efd0..f3ef4f2684 100644 --- a/indra/llmessage/llpumpio.cpp +++ b/indra/llmessage/llpumpio.cpp @@ -207,6 +207,7 @@ bool LLPumpIO::addChain(const chain_t& chain, F32 timeout, bool has_curl_request  	info.mHasCurlRequest = has_curl_request;  	info.setTimeoutSeconds(timeout);  	info.mData = LLIOPipe::buffer_ptr_t(new LLBufferArray); +	info.mData->setThreaded(has_curl_request);  	LLLinkInfo link;  #if LL_DEBUG_PIPE_TYPE_IN_PUMP  	lldebugs << "LLPumpIO::addChain() " << chain[0] << " '" | 
