diff options
| author | andreykproductengine <andreykproductengine@lindenlab.com> | 2018-04-19 18:37:35 +0300 | 
|---|---|---|
| committer | andreykproductengine <andreykproductengine@lindenlab.com> | 2018-04-19 18:37:35 +0300 | 
| commit | 4d3642b2fab163668b1d67c0a6b631417e71e02d (patch) | |
| tree | 0d8e821e90662098ed69138451c4d36a1128542f /indra | |
| parent | 1fc32d4c973275cf86d2b936e7bb3241e0155197 (diff) | |
MAINT-7626 Incorporate transfer changes into viewer
Diffstat (limited to 'indra')
| -rw-r--r-- | indra/llmessage/llxfer.cpp | 43 | ||||
| -rw-r--r-- | indra/llmessage/llxfer.h | 6 | ||||
| -rw-r--r-- | indra/llmessage/llxfer_file.cpp | 40 | ||||
| -rw-r--r-- | indra/llmessage/llxfer_file.h | 6 | ||||
| -rw-r--r-- | indra/llmessage/llxfer_mem.cpp | 22 | ||||
| -rw-r--r-- | indra/llmessage/llxfer_mem.h | 1 | ||||
| -rw-r--r-- | indra/llmessage/llxfer_vfile.cpp | 32 | ||||
| -rw-r--r-- | indra/llmessage/llxfer_vfile.h | 6 | ||||
| -rw-r--r-- | indra/llmessage/llxfermanager.cpp | 425 | ||||
| -rw-r--r-- | indra/llmessage/llxfermanager.h | 21 | 
10 files changed, 364 insertions, 238 deletions
diff --git a/indra/llmessage/llxfer.cpp b/indra/llmessage/llxfer.cpp index e0590dfdff..a42ecefd23 100644 --- a/indra/llmessage/llxfer.cpp +++ b/indra/llmessage/llxfer.cpp @@ -99,7 +99,22 @@ void LLXfer::cleanup ()  S32 LLXfer::startSend (U64 xfer_id, const LLHost &remote_host)  { -	LL_WARNS() << "undifferentiated LLXfer::startSend for " << getFileName() << LL_ENDL; +	LL_WARNS("Xfer") << "unexpected call to base class LLXfer::startSend for " << getFileName() << LL_ENDL; +	return (-1); +} + +/////////////////////////////////////////////////////////// + +void LLXfer::closeFileHandle() +{ +	LL_WARNS("Xfer") << "unexpected call to base class LLXfer::closeFileHandle for " << getFileName() << LL_ENDL; +} + +/////////////////////////////////////////////////////////// + +S32 LLXfer::reopenFileHandle() +{ +	LL_WARNS("Xfer") << "unexpected call to base class LLXfer::reopenFileHandle for " << getFileName() << LL_ENDL;  	return (-1);  } @@ -127,14 +142,14 @@ S32 LLXfer::receiveData (char *datap, S32 data_size)  	S32 retval = 0;  	if (((S32) mBufferLength + data_size) > getMaxBufferSize()) -	{ +	{	// Write existing data to disk if it's larger than the buffer size  		retval = flush();  	}  	if (!retval)  	{  		if (datap != NULL) -		{ +		{	// Append new data to mBuffer  			memcpy(&mBuffer[mBufferLength],datap,data_size);	/*Flawfinder: ignore*/  			mBufferLength += data_size;  		} @@ -248,7 +263,12 @@ void LLXfer::sendPacket(S32 packet_num)  		gMessageSystem->nextBlockFast(_PREHASH_DataPacket);  		gMessageSystem->addBinaryDataFast(_PREHASH_Data, &fdata_buf,fdata_size); -		gMessageSystem->sendMessage(mRemoteHost); +		S32 sent_something = gMessageSystem->sendMessage(mRemoteHost); +		if (sent_something == 0) +		{ +			abort(LL_ERR_CIRCUIT_GONE); +			return; +		}  		ACKTimer.reset();  		mWaitingForACK = TRUE; @@ -326,12 +346,15 @@ void LLXfer::abort (S32 result_code)  	LL_INFOS() << "Aborting xfer from " << mRemoteHost << " named " << getFileName()  			<< " - error: " << result_code << LL_ENDL; -	gMessageSystem->newMessageFast(_PREHASH_AbortXfer); -	gMessageSystem->nextBlockFast(_PREHASH_XferID); -	gMessageSystem->addU64Fast(_PREHASH_ID, mID); -	gMessageSystem->addS32Fast(_PREHASH_Result, result_code); -	 -	gMessageSystem->sendMessage(mRemoteHost); +	if (result_code != LL_ERR_CIRCUIT_GONE) +	{ +		gMessageSystem->newMessageFast(_PREHASH_AbortXfer); +		gMessageSystem->nextBlockFast(_PREHASH_XferID); +		gMessageSystem->addU64Fast(_PREHASH_ID, mID); +		gMessageSystem->addS32Fast(_PREHASH_Result, result_code); + +		gMessageSystem->sendMessage(mRemoteHost); +	}  	mStatus = e_LL_XFER_ABORTED;  } diff --git a/indra/llmessage/llxfer.h b/indra/llmessage/llxfer.h index edf5eeb82d..3be8da4fbe 100644 --- a/indra/llmessage/llxfer.h +++ b/indra/llmessage/llxfer.h @@ -62,7 +62,7 @@ class LLXfer  	S32 mXferSize;  	char *mBuffer; -	U32 mBufferLength; +	U32 mBufferLength;			// Size of valid data, not actual allocated buffer size  	U32 mBufferStartOffset;  	BOOL mBufferContainsEOF; @@ -90,7 +90,9 @@ class LLXfer  	void init(S32 chunk_size);  	virtual void cleanup(); -	virtual S32 startSend (U64 xfer_id, const LLHost &remote_host); +	virtual S32 startSend(U64 xfer_id, const LLHost &remote_host); +	virtual void closeFileHandle(); +	virtual S32 reopenFileHandle();  	virtual void sendPacket(S32 packet_num);  	virtual void sendNextPacket();  	virtual void resendLastPacket(); diff --git a/indra/llmessage/llxfer_file.cpp b/indra/llmessage/llxfer_file.cpp index 8e2ed890e7..ad55e389f3 100644 --- a/indra/llmessage/llxfer_file.cpp +++ b/indra/llmessage/llxfer_file.cpp @@ -206,7 +206,8 @@ S32 LLXfer_File::startSend (U64 xfer_id, const LLHost &remote_host)  	mBufferLength = 0;  	mBufferStartOffset = 0;	 -	 + +	// We leave the file open, assuming we'll start reading and sending soon  	mFp = LLFile::fopen(mLocalFilename,"rb");		/* Flawfinder : ignore */  	if (mFp)  	{ @@ -233,6 +234,36 @@ S32 LLXfer_File::startSend (U64 xfer_id, const LLHost &remote_host)  }  /////////////////////////////////////////////////////////// +void LLXfer_File::closeFileHandle() +{ +	if (mFp) +	{ +		fclose(mFp); +		mFp = NULL; +	} +} + +/////////////////////////////////////////////////////////// + +S32 LLXfer_File::reopenFileHandle() +{ +	S32 retval = LL_ERR_NOERR;  // presume success + +	if (mFp == NULL) +	{ +		mFp = LLFile::fopen(mLocalFilename,"rb");		/* Flawfinder : ignore */ +		if (mFp == NULL) +		{ +			LL_INFOS("Xfer") << "Warning: " << mLocalFilename << " not found when re-opening file" << LL_ENDL; +			retval = LL_ERR_FILE_NOT_FOUND; +		} +	} + +	return retval; +} + + +///////////////////////////////////////////////////////////  S32 LLXfer_File::getMaxBufferSize ()  { @@ -285,9 +316,12 @@ S32 LLXfer_File::flush()  		if (mFp)  		{ -			if (fwrite(mBuffer,1,mBufferLength,mFp) != mBufferLength) +			S32 write_size = fwrite(mBuffer,1,mBufferLength,mFp); +			if (write_size != mBufferLength)  			{ -				LL_WARNS() << "Short write" << LL_ENDL; +				LL_WARNS("Xfer") << "Non-matching write size, requested " << mBufferLength +					<< " but wrote " << write_size +					<< LL_ENDL;  			}  //			LL_INFOS() << "******* wrote " << mBufferLength << " bytes of file xfer" << LL_ENDL; diff --git a/indra/llmessage/llxfer_file.h b/indra/llmessage/llxfer_file.h index a37dda6732..ab9374549e 100644 --- a/indra/llmessage/llxfer_file.h +++ b/indra/llmessage/llxfer_file.h @@ -61,8 +61,10 @@ class LLXfer_File : public LLXfer  	virtual S32 startDownload();  	virtual S32 processEOF(); -	 -	virtual S32 startSend (U64 xfer_id, const LLHost &remote_host); + +	virtual S32 startSend(U64 xfer_id, const LLHost &remote_host); +	virtual void closeFileHandle(); +	virtual S32 reopenFileHandle();  	virtual S32 suck(S32 start_position);  	virtual S32 flush(); diff --git a/indra/llmessage/llxfer_mem.cpp b/indra/llmessage/llxfer_mem.cpp index 3bea08f2e5..78a3e4f558 100644 --- a/indra/llmessage/llxfer_mem.cpp +++ b/indra/llmessage/llxfer_mem.cpp @@ -80,28 +80,6 @@ void LLXfer_Mem::setXferSize (S32 xfer_size)  /////////////////////////////////////////////////////////// -U64 LLXfer_Mem::registerXfer(U64 xfer_id, const void *datap, const S32 length) -{ -	mID = xfer_id; - -	if (datap) -	{ -		setXferSize(length); -		if (mBuffer) -		{ -			memcpy(mBuffer,datap,length);		/* Flawfinder : ignore */ -			mBufferLength = length; -		} -		else -		{ -			xfer_id = 0; -		} -	} -	 -	mStatus = e_LL_XFER_REGISTERED; -	return (xfer_id); -} -  S32 LLXfer_Mem::startSend (U64 xfer_id, const LLHost &remote_host)  {  	S32 retval = LL_ERR_NOERR;  // presume success diff --git a/indra/llmessage/llxfer_mem.h b/indra/llmessage/llxfer_mem.h index b5adf837df..d07779de87 100644 --- a/indra/llmessage/llxfer_mem.h +++ b/indra/llmessage/llxfer_mem.h @@ -53,7 +53,6 @@ class LLXfer_Mem : public LLXfer  	virtual void cleanup();  	virtual S32 startSend (U64 xfer_id, const LLHost &remote_host); -	virtual U64 registerXfer(U64 xfer_id, const void *datap, const S32 length);  	virtual void setXferSize (S32 data_size);  	virtual S32 initializeRequest(U64 xfer_id, diff --git a/indra/llmessage/llxfer_vfile.cpp b/indra/llmessage/llxfer_vfile.cpp index 4a378d1d34..5d41180192 100644 --- a/indra/llmessage/llxfer_vfile.cpp +++ b/indra/llmessage/llxfer_vfile.cpp @@ -205,6 +205,38 @@ S32 LLXfer_VFile::startSend (U64 xfer_id, const LLHost &remote_host)  }  /////////////////////////////////////////////////////////// + +void LLXfer_VFile::closeFileHandle() +{ +	if (mVFile) +	{ +		delete mVFile; +		mVFile = NULL; +	} +} + +/////////////////////////////////////////////////////////// + +S32 LLXfer_VFile::reopenFileHandle() +{ +	S32 retval = LL_ERR_NOERR;  // presume success + +	if (mVFile == NULL) +	{ +		mVFile =new LLVFile(mVFS, mLocalID, mType, LLVFile::READ); +		if (mVFile == NULL) +		{ +			LL_WARNS("Xfer") << "LLXfer_VFile::reopenFileHandle() can't read VFS file " << mLocalID << "." << LLAssetType::lookup(mType) << LL_ENDL; +			retval = LL_ERR_FILE_NOT_FOUND; +		} +	} + +	return retval; +} + + +/////////////////////////////////////////////////////////// +  void LLXfer_VFile::setXferSize (S32 xfer_size)  {	  	LLXfer::setXferSize(xfer_size); diff --git a/indra/llmessage/llxfer_vfile.h b/indra/llmessage/llxfer_vfile.h index 048bf49dcc..b606c08122 100644 --- a/indra/llmessage/llxfer_vfile.h +++ b/indra/llmessage/llxfer_vfile.h @@ -66,8 +66,10 @@ class LLXfer_VFile : public LLXfer  	virtual S32 startDownload();  	virtual S32 processEOF(); -	 -	virtual S32 startSend (U64 xfer_id, const LLHost &remote_host); + +	virtual S32 startSend(U64 xfer_id, const LLHost &remote_host); +	virtual void closeFileHandle(); +	virtual S32 reopenFileHandle();  	virtual S32 suck(S32 start_position);  	virtual S32 flush(); diff --git a/indra/llmessage/llxfermanager.cpp b/indra/llmessage/llxfermanager.cpp index 2ceb64ce8f..8f18004791 100644 --- a/indra/llmessage/llxfermanager.cpp +++ b/indra/llmessage/llxfermanager.cpp @@ -44,6 +44,8 @@ const S32 LL_PACKET_RETRY_LIMIT = 10;            // packet retransmission limit  const S32 LL_DEFAULT_MAX_SIMULTANEOUS_XFERS = 10;  const S32 LL_DEFAULT_MAX_REQUEST_FIFO_XFERS = 1000; +const S32 LL_DEFAULT_MAX_HARD_LIMIT_SIMULTANEOUS_XFERS = 500; +  #define LL_XFER_PROGRESS_MESSAGES 0  #define LL_XFER_TEST_REXMIT       0 @@ -66,10 +68,10 @@ LLXferManager::~LLXferManager ()  void LLXferManager::init (LLVFS *vfs)  { -	mSendList = NULL; -	mReceiveList = NULL; +	cleanup();  	setMaxOutgoingXfersPerCircuit(LL_DEFAULT_MAX_SIMULTANEOUS_XFERS); +	setHardLimitOutgoingXfersPerCircuit(LL_DEFAULT_MAX_HARD_LIMIT_SIMULTANEOUS_XFERS);  	setMaxIncomingXfers(LL_DEFAULT_MAX_REQUEST_FIFO_XFERS);  	mVFS = vfs; @@ -83,29 +85,14 @@ void LLXferManager::init (LLVFS *vfs)  void LLXferManager::cleanup ()  { -	LLXfer *xferp; -	LLXfer *delp; -  	for_each(mOutgoingHosts.begin(), mOutgoingHosts.end(), DeletePointer());  	mOutgoingHosts.clear(); -	delp = mSendList; -	while (delp) -	{ -		xferp = delp->mNext; -		delete delp; -		delp = xferp; -	} -	mSendList = NULL; +	for_each(mSendList.begin(), mSendList.end(), DeletePointer()); +	mSendList.clear(); -	delp = mReceiveList; -	while (delp) -	{ -		xferp = delp->mNext; -		delete delp; -		delp = xferp; -	} -	mReceiveList = NULL; +	for_each(mReceiveList.begin(), mReceiveList.end(), DeletePointer()); +	mReceiveList.clear();  }  /////////////////////////////////////////////////////////// @@ -122,6 +109,11 @@ void LLXferManager::setMaxOutgoingXfersPerCircuit(S32 max_num)  	mMaxOutgoingXfersPerCircuit = max_num;  } +void LLXferManager::setHardLimitOutgoingXfersPerCircuit(S32 max_num) +{ +	mHardLimitOutgoingXfersPerCircuit = max_num; +} +  void LLXferManager::setUseAckThrottling(const BOOL use)  {  	mUseAckThrottling = use; @@ -148,19 +140,18 @@ void LLXferManager::setAckThrottleBPS(const F32 bps)  void LLXferManager::updateHostStatus()  { -    LLXfer *xferp; -	LLHostStatus *host_statusp = NULL; -  	for_each(mOutgoingHosts.begin(), mOutgoingHosts.end(), DeletePointer());  	mOutgoingHosts.clear(); -	for (xferp = mSendList; xferp; xferp = xferp->mNext) +	for (xfer_list_t::iterator send_iter = mSendList.begin(); +			send_iter != mSendList.end(); ++send_iter)  	{ +		LLHostStatus *host_statusp = NULL;  		for (status_list_t::iterator iter = mOutgoingHosts.begin();  			 iter != mOutgoingHosts.end(); ++iter)  		{  			host_statusp = *iter; -			if (host_statusp->mHost == xferp->mRemoteHost) +			if (host_statusp->mHost == (*send_iter)->mRemoteHost)  			{  				break;  			} @@ -170,23 +161,22 @@ void LLXferManager::updateHostStatus()  			host_statusp = new LLHostStatus();  			if (host_statusp)  			{ -				host_statusp->mHost = xferp->mRemoteHost; +				host_statusp->mHost = (*send_iter)->mRemoteHost;  				mOutgoingHosts.push_front(host_statusp);  			}  		}  		if (host_statusp)  		{ -			if (xferp->mStatus == e_LL_XFER_PENDING) +			if ((*send_iter)->mStatus == e_LL_XFER_PENDING)  			{  				host_statusp->mNumPending++;  			} -			else if (xferp->mStatus == e_LL_XFER_IN_PROGRESS) +			else if ((*send_iter)->mStatus == e_LL_XFER_IN_PROGRESS)  			{  				host_statusp->mNumActive++;  			}  		} -		 -	}	 +	}  }  /////////////////////////////////////////////////////////// @@ -209,14 +199,15 @@ void LLXferManager::printHostStatus()  /////////////////////////////////////////////////////////// -LLXfer *LLXferManager::findXfer (U64 id, LLXfer *list_head) +LLXfer *LLXferManager::findXfer(U64 id, xfer_list_t & xfer_list)  { -    LLXfer *xferp; -	for (xferp = list_head; xferp; xferp = xferp->mNext) +	for (xfer_list_t::iterator iter = xfer_list.begin(); +		 iter != xfer_list.end(); +		 ++iter)  	{ -		if (xferp->mID == id) +		if ((*iter)->mID == id)  		{ -			return(xferp); +			return(*iter);  		}  	}  	return(NULL); @@ -225,29 +216,34 @@ LLXfer *LLXferManager::findXfer (U64 id, LLXfer *list_head)  /////////////////////////////////////////////////////////// -void LLXferManager::removeXfer (LLXfer *delp, LLXfer **list_head) +void LLXferManager::removeXfer(LLXfer *delp, xfer_list_t & xfer_list)  { -	// This function assumes that delp will only occur in the list -	// zero or one times.  	if (delp) -	{ -		if (*list_head == delp) +	{	 +		std::string direction = "send"; +		if (&xfer_list == &mReceiveList)  		{ -			*list_head = delp->mNext; -			delete (delp); +			std::string direction = "receive"; +			xfer_list = mSendList;  		} -		else + +		// This assumes that delp will occur in the list once at most +		// Find the pointer in the list +		for (xfer_list_t::iterator iter = xfer_list.begin(); +			 iter != xfer_list.end(); +			 ++iter)  		{ -			LLXfer *xferp = *list_head; -			while (xferp->mNext) +			if ((*iter) == delp)  			{ -				if (xferp->mNext == delp) -				{ -					xferp->mNext = delp->mNext; -					delete (delp); -					break; -				} -				xferp = xferp->mNext; +				LL_DEBUGS("Xfer") << "Deleting xfer to host " << (*iter)->mRemoteHost +					<< " of " << (*iter)->mXferSize << " bytes" +					<< ", status " << (S32)((*iter)->mStatus) +					<< " from the " << direction << " list" +					<< LL_ENDL; + +				xfer_list.erase(iter); +				delete (delp); +				break;  			}  		}  	} @@ -272,7 +268,7 @@ U32 LLXferManager::numActiveListEntries(LLXfer *list_head)  /////////////////////////////////////////////////////////// -S32 LLXferManager::numPendingXfers(const LLHost &host) +LLHostStatus * LLXferManager::findHostStatus(const LLHost &host)  {  	LLHostStatus *host_statusp = NULL; @@ -282,26 +278,32 @@ S32 LLXferManager::numPendingXfers(const LLHost &host)  		host_statusp = *iter;  		if (host_statusp->mHost == host)  		{ -			return (host_statusp->mNumPending); +			return (host_statusp);  		}  	}  	return 0;  }  /////////////////////////////////////////////////////////// +  +S32 LLXferManager::numPendingXfers(const LLHost &host) +{ +	LLHostStatus *host_statusp = findHostStatus(host); +	if (host_statusp) +	{ +		return host_statusp->mNumPending; +	} +	return 0; +} + +///////////////////////////////////////////////////////////  S32 LLXferManager::numActiveXfers(const LLHost &host)  { -	LLHostStatus *host_statusp = NULL; - -	for (status_list_t::iterator iter = mOutgoingHosts.begin(); -		 iter != mOutgoingHosts.end(); ++iter) +	LLHostStatus *host_statusp = findHostStatus(host); +	if (host_statusp)  	{ -		host_statusp = *iter; -		if (host_statusp->mHost == host) -		{ -			return (host_statusp->mNumActive); -		} +		return host_statusp->mNumActive;  	}  	return 0;  } @@ -372,35 +374,6 @@ BOOL LLXferManager::isLastPacket(S32 packet_num)  /////////////////////////////////////////////////////////// -U64 LLXferManager::registerXfer(const void *datap, const S32 length) -{ -	LLXfer *xferp; -	U64 xfer_id = getNextID(); - -	xferp = (LLXfer *) new LLXfer_Mem(); -	if (xferp) -	{ -		xferp->mNext = mSendList; -		mSendList = xferp; - -		xfer_id = ((LLXfer_Mem *)xferp)->registerXfer(xfer_id, datap,length); - -		if (!xfer_id) -		{ -			removeXfer(xferp,&mSendList); -		} -	} -	else -	{ -		LL_ERRS() << "Xfer allocation error" << LL_ENDL; -		xfer_id = 0; -	}	 - -    return(xfer_id); -} - -/////////////////////////////////////////////////////////// -  U64 LLXferManager::requestFile(const std::string& local_filename,  								const std::string& remote_filename,  								ELLPath remote_path, @@ -411,30 +384,33 @@ U64 LLXferManager::requestFile(const std::string& local_filename,  								BOOL is_priority,  								BOOL use_big_packets)  { -	LLXfer *xferp; +	LLXfer_File* file_xfer_p = NULL; -	for (xferp = mReceiveList; xferp ; xferp = xferp->mNext) +	for (xfer_list_t::iterator iter = mReceiveList.begin(); +			iter != mReceiveList.end(); ++iter)  	{ -		if (xferp->getXferTypeTag() == LLXfer::XFER_FILE -			&& (((LLXfer_File*)xferp)->matchesLocalFilename(local_filename)) -			&& (((LLXfer_File*)xferp)->matchesRemoteFilename(remote_filename, remote_path)) -			&& (remote_host == xferp->mRemoteHost) -			&& (callback == xferp->mCallback) -			&& (user_data == xferp->mCallbackDataHandle)) - +		if ((*iter)->getXferTypeTag() == LLXfer::XFER_FILE)  		{ -			// cout << "requested a xfer already in progress" << endl; -			return xferp->mID; +			file_xfer_p = (LLXfer_File*)(*iter); +			if (file_xfer_p->matchesLocalFilename(local_filename) +				&& file_xfer_p->matchesRemoteFilename(remote_filename, remote_path) +				&& (remote_host == file_xfer_p->mRemoteHost) +				&& (callback == file_xfer_p->mCallback) +				&& (user_data == file_xfer_p->mCallbackDataHandle)) +			{ +				// Already have the request	(already in progress) +				return (*iter)->mID; +			}  		}  	}  	U64 xfer_id = 0;  	S32 chunk_size = use_big_packets ? LL_XFER_LARGE_PAYLOAD : -1; -	xferp = (LLXfer *) new LLXfer_File(chunk_size); -	if (xferp) +	file_xfer_p = new LLXfer_File(chunk_size); +	if (file_xfer_p)  	{ -		addToList(xferp, mReceiveList, is_priority); +		addToList(file_xfer_p, mReceiveList, is_priority);  		// Remove any file by the same name that happens to be lying  		// around. @@ -447,7 +423,7 @@ U64 LLXferManager::requestFile(const std::string& local_filename,  			LLFile::remove(local_filename, ENOENT);  		}  		xfer_id = getNextID(); -		((LLXfer_File *)xferp)->initializeRequest( +		((LLXfer_File *)file_xfer_p)->initializeRequest(  			xfer_id,  			local_filename,  			remote_filename, @@ -500,28 +476,32 @@ void LLXferManager::requestVFile(const LLUUID& local_id,  								 void** user_data,  								 BOOL is_priority)  { -	LLXfer *xferp; +	LLXfer_VFile * xfer_p = NULL; -	for (xferp = mReceiveList; xferp ; xferp = xferp->mNext) +	for (xfer_list_t::iterator iter = mReceiveList.begin(); +			iter != mReceiveList.end(); ++iter)  	{ -		if (xferp->getXferTypeTag() == LLXfer::XFER_VFILE -			&& (((LLXfer_VFile*)xferp)->matchesLocalFile(local_id, type)) -			&& (((LLXfer_VFile*)xferp)->matchesRemoteFile(remote_id, type)) -			&& (remote_host == xferp->mRemoteHost) -			&& (callback == xferp->mCallback) -			&& (user_data == xferp->mCallbackDataHandle)) - +		if ((*iter)->getXferTypeTag() == LLXfer::XFER_VFILE)  		{ -			// cout << "requested a xfer already in progress" << endl; -			return; +			xfer_p = (LLXfer_VFile*) (*iter); +			if (xfer_p->matchesLocalFile(local_id, type) +				&& xfer_p->matchesRemoteFile(remote_id, type) +				&& (remote_host == xfer_p->mRemoteHost) +				&& (callback == xfer_p->mCallback) +				&& (user_data == xfer_p->mCallbackDataHandle)) + +			{ +				// Have match, already in progress, don't add a duplicate +				return; +			}  		}  	} -	xferp = (LLXfer *) new LLXfer_VFile(); -	if (xferp) +	xfer_p = new LLXfer_VFile(); +	if (xfer_p)  	{ -		addToList(xferp, mReceiveList, is_priority); -		((LLXfer_VFile *)xferp)->initializeRequest(getNextID(), +		addToList(xfer_p, mReceiveList, is_priority); +		((LLXfer_VFile *)xfer_p)->initializeRequest(getNextID(),  			vfs,  			local_id,  			remote_id, @@ -663,7 +643,7 @@ void LLXferManager::processReceiveData (LLMessageSystem *mesgsys, void ** /*user  	if (result == LL_ERR_CANNOT_OPEN_FILE)  	{  			xferp->abort(LL_ERR_CANNOT_OPEN_FILE); -			removeXfer(xferp,&mReceiveList); +			removeXfer(xferp,mReceiveList);  			startPendingDownloads();  			return;		  	} @@ -688,7 +668,7 @@ void LLXferManager::processReceiveData (LLMessageSystem *mesgsys, void ** /*user  	if (isLastPacket(packetnum))  	{  		xferp->processEOF(); -		removeXfer(xferp,&mReceiveList); +		removeXfer(xferp,mReceiveList);  		startPendingDownloads();  	}  } @@ -708,6 +688,7 @@ void LLXferManager::sendConfirmPacket (LLMessageSystem *mesgsys, U64 id, S32 pac  	mesgsys->addU64Fast(_PREHASH_ID, id);  	mesgsys->addU32Fast(_PREHASH_Packet, packetnum); +	// Ignore a circuit failure here, we'll catch it with another message  	mesgsys->sendMessage(remote_host);  } @@ -825,7 +806,7 @@ void LLXferManager::processFileRequest (LLMessageSystem *mesgsys, void ** /*user  	LLXfer *xferp;  	if (uuid != LLUUID::null) -	{ +	{	// Request for an asset - use a VFS file  		if(NULL == LLAssetType::lookup(type))  		{  			LL_WARNS() << "Invalid type for xfer request: " << uuid << ":" @@ -844,8 +825,7 @@ void LLXferManager::processFileRequest (LLMessageSystem *mesgsys, void ** /*user  		xferp = (LLXfer *)new LLXfer_VFile(mVFS, uuid, type);  		if (xferp)  		{ -			xferp->mNext = mSendList; -			mSendList = xferp;	 +			mSendList.push_front(xferp);  			result = xferp->startSend(id,mesgsys->getSender());  		}  		else @@ -854,7 +834,7 @@ void LLXferManager::processFileRequest (LLMessageSystem *mesgsys, void ** /*user  		}  	}  	else if (!local_filename.empty()) -	{ +	{	// Was given a file name to send  		// See DEV-21775 for detailed security issues  		if (local_path == LL_PATH_NONE) @@ -912,8 +892,7 @@ void LLXferManager::processFileRequest (LLMessageSystem *mesgsys, void ** /*user  		if (xferp)  		{ -			xferp->mNext = mSendList; -			mSendList = xferp;	 +			mSendList.push_front(xferp);  			result = xferp->startSend(id,mesgsys->getSender());  		}  		else @@ -922,7 +901,7 @@ void LLXferManager::processFileRequest (LLMessageSystem *mesgsys, void ** /*user  		}  	}  	else -	{ +	{	// no uuid or filename - use the ID sent  		char U64_BUF[MAX_STRING];		/* Flawfinder : ignore */  		LL_INFOS() << "starting memory transfer: "  			<< U64_to_str(id, U64_BUF, sizeof(U64_BUF)) << " to " @@ -946,7 +925,7 @@ void LLXferManager::processFileRequest (LLMessageSystem *mesgsys, void ** /*user  		if (xferp)  		{  			xferp->abort(result); -			removeXfer(xferp,&mSendList); +			removeXfer(xferp, mSendList);  		}  		else // can happen with a memory transfer not found  		{ @@ -960,24 +939,86 @@ void LLXferManager::processFileRequest (LLMessageSystem *mesgsys, void ** /*user  			mesgsys->sendMessage(mesgsys->getSender());		  		}  	} -	else if(xferp && (numActiveXfers(xferp->mRemoteHost) < mMaxOutgoingXfersPerCircuit)) -	{ -		xferp->sendNextPacket(); -		changeNumActiveXfers(xferp->mRemoteHost,1); -//		LL_INFOS() << "***STARTING XFER IMMEDIATELY***" << LL_ENDL; -	} -	else +	else if(xferp)  	{ -		if(xferp) +		// Figure out how many transfers the host has requested +		LLHostStatus *host_statusp = findHostStatus(xferp->mRemoteHost); +		if (host_statusp)  		{ -			LL_INFOS() << "  queueing xfer request, " << numPendingXfers(xferp->mRemoteHost) << " ahead of this one" << LL_ENDL; +			if (host_statusp->mNumActive < mMaxOutgoingXfersPerCircuit) +			{	// Not many transfers in progress already, so start immediately +				xferp->sendNextPacket(); +				changeNumActiveXfers(xferp->mRemoteHost,1); +				LL_DEBUGS("Xfer") << "Starting xfer ID " << U64_to_str(id) << " immediately" << LL_ENDL; +			} +			else if (mHardLimitOutgoingXfersPerCircuit == 0 || +				     (host_statusp->mNumActive + host_statusp->mNumPending) < mHardLimitOutgoingXfersPerCircuit) +			{	// Must close the file handle and wait for earlier ones to complete +				LL_INFOS("Xfer") << "  queueing xfer request id " << U64_to_str(id) << ", "  +								 << host_statusp->mNumActive << " active and " +								 << host_statusp->mNumPending << " pending ahead of this one"  +								 << LL_ENDL; +				xferp->closeFileHandle();	// Close the file handle until we're ready to send again +			} +			else if (mHardLimitOutgoingXfersPerCircuit > 0) +			{	// Way too many requested ... it's time to stop being nice and kill the circuit +				xferp->closeFileHandle();	// Close the file handle in any case +				LLCircuitData *cdp = gMessageSystem->mCircuitInfo.findCircuit(xferp->mRemoteHost); +				if (cdp) +				{ +					if (cdp->getTrusted()) +					{	// Trusted internal circuit - don't kill it +						LL_WARNS("Xfer") << "Trusted circuit to " << xferp->mRemoteHost << " has too many xfer requests in the queue "  +										<< host_statusp->mNumActive << " active and " +										<< host_statusp->mNumPending << " pending ahead of this one"  +										<< LL_ENDL; +					} +					else +					{	// Untrusted circuit - time to stop messing around and kill it +						LL_WARNS("Xfer") << "Killing circuit to " << xferp->mRemoteHost << " for having too many xfer requests in the queue "  +										<< host_statusp->mNumActive << " active and " +										<< host_statusp->mNumPending << " pending ahead of this one"  +										<< LL_ENDL; +						gMessageSystem->disableCircuit(xferp->mRemoteHost); +					} +				} +				else +				{	// WTF?   Why can't we find a circuit?  Try to kill it off +					LL_WARNS("Xfer") << "Backlog with circuit to " << xferp->mRemoteHost << " with too many xfer requests in the queue "  +									<< host_statusp->mNumActive << " active and " +									<< host_statusp->mNumPending << " pending ahead of this one"  +									<< " but no LLCircuitData found???" +									<< LL_ENDL; +					gMessageSystem->disableCircuit(xferp->mRemoteHost); +				} +			}  		}  		else  		{ -			LL_WARNS() << "LLXferManager::processFileRequest() - no xfer found!" -					<< LL_ENDL; +			LL_WARNS("Xfer") << "LLXferManager::processFileRequest() - no LLHostStatus found for id " << U64_to_str(id)	 +				<< " host " << xferp->mRemoteHost << LL_ENDL;  		}  	} +	else +	{ +		LL_WARNS("Xfer") << "LLXferManager::processFileRequest() - no xfer found for id " << U64_to_str(id)	<< LL_ENDL; +	} +} +  +/////////////////////////////////////////////////////////// + +// Return true if host is in a transfer-flood sitation.  Same check for both internal and external hosts +bool LLXferManager::isHostFlooded(const LLHost & host) +{ +	bool flooded = false; +	LLHostStatus *host_statusp = findHostStatus(host); +	if (host_statusp) +	{ +		flooded = (mHardLimitOutgoingXfersPerCircuit > 0 && +				    (host_statusp->mNumActive + host_statusp->mNumPending) >= (S32)(mHardLimitOutgoingXfersPerCircuit * 0.8f)); +	} + +	return flooded;  } @@ -1002,52 +1043,57 @@ void LLXferManager::processConfirmation (LLMessageSystem *mesgsys, void ** /*use  		}  		else  		{ -			removeXfer(xferp, &mSendList); +			removeXfer(xferp, mSendList);  		}  	}  }  /////////////////////////////////////////////////////////// -void LLXferManager::retransmitUnackedPackets () +void LLXferManager::retransmitUnackedPackets()  {  	LLXfer *xferp; -	LLXfer *delp; -	xferp = mReceiveList; -	while(xferp) + +	xfer_list_t::iterator iter = mReceiveList.begin(); +	while (iter != mReceiveList.end())  	{ +		xferp = (*iter);  		if (xferp->mStatus == e_LL_XFER_IN_PROGRESS)  		{  			// if the circuit dies, abort  			if (! gMessageSystem->mCircuitInfo.isCircuitAlive( xferp->mRemoteHost ))  			{ -				LL_INFOS() << "Xfer found in progress on dead circuit, aborting" << LL_ENDL; +				LL_WARNS("Xfer") << "Xfer found in progress on dead circuit, aborting transfer to "  +					<< xferp->mRemoteHost.getIPandPort() +					<< LL_ENDL;  				xferp->mCallbackResult = LL_ERR_CIRCUIT_GONE;  				xferp->processEOF(); -				delp = xferp; -				xferp = xferp->mNext; -				removeXfer(delp,&mReceiveList); + +				iter = mReceiveList.erase(iter);	// iter is set to next one after the deletion point +				delete (xferp);  				continue;   			}  		} -		xferp = xferp->mNext; +		++iter;  	} -	xferp = mSendList;   	updateHostStatus(); +  	F32 et; -	while (xferp) +	iter = mSendList.begin(); +	while (iter != mSendList.end())  	{ +		xferp = (*iter);  		if (xferp->mWaitingForACK && ( (et = xferp->ACKTimer.getElapsedTimeF32()) > LL_PACKET_TIMEOUT))  		{  			if (xferp->mRetries > LL_PACKET_RETRY_LIMIT)  			{  				LL_INFOS() << "dropping xfer " << xferp->mRemoteHost << ":" << xferp->getFileName() << " packet retransmit limit exceeded, xfer dropped" << LL_ENDL;  				xferp->abort(LL_ERR_TCP_TIMEOUT); -				delp = xferp; -				xferp = xferp->mNext; -				removeXfer(delp,&mSendList); +				iter = mSendList.erase(iter); +				delete xferp; +				continue;  			}  			else  			{ @@ -1060,25 +1106,37 @@ void LLXferManager::retransmitUnackedPackets ()  		{  			LL_INFOS() << "registered xfer never requested, xfer dropped" << LL_ENDL;  			xferp->abort(LL_ERR_TCP_TIMEOUT); -			delp = xferp; -			xferp = xferp->mNext; -			removeXfer(delp,&mSendList); +			iter = mSendList.erase(iter); +			delete xferp; +			continue;  		}  		else if (xferp->mStatus == e_LL_XFER_ABORTED)  		{  			LL_WARNS() << "Removing aborted xfer " << xferp->mRemoteHost << ":" << xferp->getFileName() << LL_ENDL; -			delp = xferp; -			xferp = xferp->mNext; -			removeXfer(delp,&mSendList); +			iter = mSendList.erase(iter); +			delete xferp; +			continue;  		}  		else if (xferp->mStatus == e_LL_XFER_PENDING)  		{  //			LL_INFOS() << "*** numActiveXfers = " << numActiveXfers(xferp->mRemoteHost) << "        mMaxOutgoingXfersPerCircuit = " << mMaxOutgoingXfersPerCircuit << LL_ENDL;     			if (numActiveXfers(xferp->mRemoteHost) < mMaxOutgoingXfersPerCircuit)  			{ -//			    LL_INFOS() << "bumping pending xfer to active" << LL_ENDL; -				xferp->sendNextPacket(); -				changeNumActiveXfers(xferp->mRemoteHost,1); +				if (xferp->reopenFileHandle()) +				{ +					LL_WARNS("Xfer") << "Error re-opening file handle for xfer ID " << U64_to_str(xferp->mID) +						<< " to host " << xferp->mRemoteHost << LL_ENDL; +					xferp->abort(LL_ERR_CANNOT_OPEN_FILE); +					iter = mSendList.erase(iter); +					delete xferp; +					continue; +				} +				else +				{	// No error re-opening the file, send the first packet +					LL_DEBUGS("Xfer") << "Moving pending xfer ID " << U64_to_str(xferp->mID) << " to active" << LL_ENDL; +					xferp->sendNextPacket(); +					changeNumActiveXfers(xferp->mRemoteHost,1); +				}  			}			  			xferp = xferp->mNext;  		} @@ -1124,7 +1182,7 @@ void LLXferManager::abortRequestById(U64 xfer_id, S32 result_code)  		{  			xferp->mCallbackResult = result_code;  			xferp->processEOF(); //should notify requester -			removeXfer(xferp, &mReceiveList); +			removeXfer(xferp, mReceiveList);  		}  		// Since already removed or marked as aborted no need  		// to wait for processAbort() to start new download @@ -1148,7 +1206,7 @@ void LLXferManager::processAbort (LLMessageSystem *mesgsys, void ** /*user_data*  	{  		xferp->mCallbackResult = result_code;  		xferp->processEOF(); -		removeXfer(xferp, &mReceiveList); +		removeXfer(xferp, mReceiveList);  		startPendingDownloads();  	}  } @@ -1164,12 +1222,15 @@ void LLXferManager::startPendingDownloads()  	// requests get pushed toward the back. Thus, if we didn't do a  	// stateful iteration, it would be possible for old requests to  	// never start. -	LLXfer* xferp = mReceiveList; +	LLXfer* xferp;  	std::list<LLXfer*> pending_downloads;  	S32 download_count = 0;  	S32 pending_count = 0; -	while(xferp) +	for (xfer_list_t::iterator iter = mReceiveList.begin(); +		 iter != mReceiveList.end(); +		 ++iter)  	{ +		xferp = (*iter);  		if(xferp->mStatus == e_LL_XFER_PENDING)  		{  			++pending_count; @@ -1209,29 +1270,15 @@ void LLXferManager::startPendingDownloads()  /////////////////////////////////////////////////////////// -void LLXferManager::addToList(LLXfer* xferp, LLXfer*& head, BOOL is_priority) +void LLXferManager::addToList(LLXfer* xferp, xfer_list_t & xfer_list, BOOL is_priority)  {  	if(is_priority)  	{ -		xferp->mNext = NULL; -		LLXfer* next = head; -		if(next) -		{ -			while(next->mNext) -			{ -				next = next->mNext; -			} -			next->mNext = xferp; -		} -		else -		{ -			head = xferp; -		} +		xfer_list.push_back(xferp);  	}  	else  	{ -		xferp->mNext = head; -		head = xferp; +		xfer_list.push_front(xferp);  	}  } diff --git a/indra/llmessage/llxfermanager.h b/indra/llmessage/llxfermanager.h index d258f0a5ce..ebe8e57ee2 100644 --- a/indra/llmessage/llxfermanager.h +++ b/indra/llmessage/llxfermanager.h @@ -77,6 +77,7 @@ class LLXferManager   protected:  	S32    mMaxOutgoingXfersPerCircuit; +	S32    mHardLimitOutgoingXfersPerCircuit;	// At this limit, kill off the connection  	S32    mMaxIncomingXfers;  	BOOL	mUseAckThrottling; // Use ack throttling to cap file xfer bandwidth @@ -92,8 +93,10 @@ class LLXferManager  		HIGH_PRIORITY = TRUE,  	}; -	LLXfer *mSendList; -	LLXfer *mReceiveList; +	// Linked FIFO list, add to the front and pull from back +	typedef std::deque<LLXfer *> xfer_list_t; +	xfer_list_t mSendList; +	xfer_list_t mReceiveList;  	typedef std::list<LLHostStatus*> status_list_t;  	status_list_t mOutgoingHosts; @@ -102,7 +105,7 @@ class LLXferManager   protected:  	// implementation methods  	virtual void startPendingDownloads(); -	virtual void addToList(LLXfer* xferp, LLXfer*& head, BOOL is_priority); +	virtual void addToList(LLXfer* xferp, xfer_list_t & list, BOOL is_priority);  	std::multiset<std::string> mExpectedTransfers; // files that are authorized to transfer out  	std::multiset<std::string> mExpectedRequests;  // files that are authorized to be downloaded on top of @@ -117,14 +120,18 @@ class LLXferManager  	void setAckThrottleBPS(const F32 bps);  // list management routines -	virtual LLXfer *findXfer(U64 id, LLXfer *list_head); -	virtual void removeXfer (LLXfer *delp, LLXfer **list_head); +	virtual LLXfer *findXfer(U64 id, xfer_list_t & xfer_list); +	virtual void removeXfer (LLXfer *delp, xfer_list_t & xfer_list); + +	LLHostStatus * findHostStatus(const LLHost &host);  	virtual U32 numActiveListEntries(LLXfer *list_head);  	virtual S32 numActiveXfers(const LLHost &host);  	virtual S32 numPendingXfers(const LLHost &host); +  	virtual void changeNumActiveXfers(const LLHost &host, S32 delta);  	virtual void setMaxOutgoingXfersPerCircuit (S32 max_num); +	virtual void setHardLimitOutgoingXfersPerCircuit(S32 max_num);  	virtual void setMaxIncomingXfers(S32 max_num);  	virtual void updateHostStatus();  	virtual void printHostStatus(); @@ -136,8 +143,6 @@ class LLXferManager  	virtual S32 decodePacketNum(S32 packet_num);	  	virtual BOOL isLastPacket(S32 packet_num); -	virtual U64 registerXfer(const void *datap, const S32 length); -  // file requesting routines  // .. to file  	virtual U64 requestFile(const std::string& local_filename, @@ -204,6 +209,8 @@ class LLXferManager  // error handling  	void abortRequestById(U64 xfer_id, S32 result_code);  	virtual void processAbort (LLMessageSystem *mesgsys, void **user_data); + +	virtual bool isHostFlooded(const LLHost & host);  };  extern LLXferManager*	gXferManager;  | 
