diff options
Diffstat (limited to 'indra/llmessage')
-rw-r--r-- | indra/llmessage/llinstantmessage.cpp | 208 | ||||
-rw-r--r-- | indra/llmessage/llinstantmessage.h | 53 | ||||
-rw-r--r-- | indra/llmessage/llxfer.cpp | 58 | ||||
-rw-r--r-- | indra/llmessage/llxfer.h | 7 | ||||
-rw-r--r-- | indra/llmessage/llxfer_file.cpp | 68 | ||||
-rw-r--r-- | indra/llmessage/llxfer_file.h | 6 | ||||
-rw-r--r-- | indra/llmessage/llxfer_mem.cpp | 22 | ||||
-rw-r--r-- | indra/llmessage/llxfer_mem.h | 1 | ||||
-rw-r--r-- | indra/llmessage/llxfer_vfile.cpp | 92 | ||||
-rw-r--r-- | indra/llmessage/llxfer_vfile.h | 8 | ||||
-rw-r--r-- | indra/llmessage/llxfermanager.cpp | 735 | ||||
-rw-r--r-- | indra/llmessage/llxfermanager.h | 50 |
12 files changed, 598 insertions, 710 deletions
diff --git a/indra/llmessage/llinstantmessage.cpp b/indra/llmessage/llinstantmessage.cpp index b7f3e6e4f7..dd5a655d7e 100644 --- a/indra/llmessage/llinstantmessage.cpp +++ b/indra/llmessage/llinstantmessage.cpp @@ -51,98 +51,6 @@ const std::string INTERACTIVE_SYSTEM_FROM("F387446C-37C4-45f2-A438-D99CBDBB563B" const S32 IM_TTL = 1; -/** - * LLIMInfo - */ -LLIMInfo::LLIMInfo() : - mFromGroup(FALSE), - mParentEstateID(0), - mOffline(0), - mViewerThinksToIsOnline(false), - mIMType(IM_NOTHING_SPECIAL), - mTimeStamp(0), - mTTL(IM_TTL) -{ -} - -LLIMInfo::LLIMInfo( - const LLUUID& from_id, - BOOL from_group, - const LLUUID& to_id, - EInstantMessage im_type, - const std::string& name, - const std::string& message, - const LLUUID& id, - U32 parent_estate_id, - const LLUUID& region_id, - const LLVector3& position, - LLSD data, - U8 offline, - U32 timestamp, - S32 ttl) : - mFromID(from_id), - mFromGroup(from_group), - mToID(to_id), - mParentEstateID(0), - mRegionID(region_id), - mPosition(position), - mOffline(offline), - mViewerThinksToIsOnline(false), - mIMType(im_type), - mID(id), - mTimeStamp(timestamp), - mName(name), - mMessage(message), - mData(data), - mTTL(ttl) -{ -} - -LLIMInfo::LLIMInfo(LLMessageSystem* msg, S32 ttl) : - mViewerThinksToIsOnline(false), - mTTL(ttl) -{ - unpackMessageBlock(msg); -} - -LLIMInfo::~LLIMInfo() -{ -} - -void LLIMInfo::packInstantMessage(LLMessageSystem* msg) const -{ - LL_DEBUGS() << "LLIMInfo::packInstantMessage()" << LL_ENDL; - msg->newMessageFast(_PREHASH_ImprovedInstantMessage); - packMessageBlock(msg); -} - -void LLIMInfo::packMessageBlock(LLMessageSystem* msg) const -{ - // Construct binary bucket - std::vector<U8> bucket; - if (mData.has("binary_bucket")) - { - bucket = mData["binary_bucket"].asBinary(); - } - pack_instant_message_block( - msg, - mFromID, - mFromGroup, - LLUUID::null, - mToID, - mName, - mMessage, - mOffline, - mIMType, - mID, - mParentEstateID, - mRegionID, - mPosition, - mTimeStamp, - &bucket[0], - bucket.size()); -} - void pack_instant_message( LLMessageSystem* msg, const LLUUID& from_id, @@ -253,120 +161,4 @@ void pack_instant_message_block( msg->addBinaryDataFast(_PREHASH_BinaryBucket, bb, binary_bucket_size); } -void LLIMInfo::unpackMessageBlock(LLMessageSystem* msg) -{ - msg->getUUIDFast(_PREHASH_AgentData, _PREHASH_AgentID, mFromID); - msg->getBOOLFast(_PREHASH_MessageBlock, _PREHASH_FromGroup, mFromGroup); - msg->getUUIDFast(_PREHASH_MessageBlock, _PREHASH_ToAgentID, mToID); - msg->getU32Fast(_PREHASH_MessageBlock, _PREHASH_ParentEstateID, mParentEstateID); - msg->getUUIDFast(_PREHASH_MessageBlock, _PREHASH_RegionID, mRegionID); - msg->getVector3Fast(_PREHASH_MessageBlock, _PREHASH_Position, mPosition); - msg->getU8Fast(_PREHASH_MessageBlock, _PREHASH_Offline, mOffline); - U8 dialog; - msg->getU8Fast(_PREHASH_MessageBlock, _PREHASH_Dialog, dialog); - mIMType = (EInstantMessage) dialog; - msg->getUUIDFast(_PREHASH_MessageBlock, _PREHASH_ID, mID); - msg->getU32Fast(_PREHASH_MessageBlock, _PREHASH_Timestamp, mTimeStamp); - msg->getStringFast(_PREHASH_MessageBlock, _PREHASH_FromAgentName, mName); - - msg->getStringFast(_PREHASH_MessageBlock, _PREHASH_Message, mMessage); - - S32 binary_bucket_size = llmin( - MTUBYTES, - msg->getSizeFast( - _PREHASH_MessageBlock, - _PREHASH_BinaryBucket)); - if(binary_bucket_size > 0) - { - std::vector<U8> bucket; - bucket.resize(binary_bucket_size); - - msg->getBinaryDataFast( - _PREHASH_MessageBlock, - _PREHASH_BinaryBucket, - &bucket[0], - 0, - 0, - binary_bucket_size); - mData["binary_bucket"] = bucket; - } - else - { - mData.clear(); - } -} - -LLSD im_info_to_llsd(LLPointer<LLIMInfo> im_info) -{ - LLSD param_version; - param_version["version"] = 1; - LLSD param_message; - param_message["from_id"] = im_info->mFromID; - param_message["from_group"] = im_info->mFromGroup; - param_message["to_id"] = im_info->mToID; - param_message["from_name"] = im_info->mName; - param_message["message"] = im_info->mMessage; - param_message["type"] = (S32)im_info->mIMType; - param_message["id"] = im_info->mID; - param_message["timestamp"] = (S32)im_info->mTimeStamp; - param_message["offline"] = (S32)im_info->mOffline; - param_message["parent_estate_id"] = (S32)im_info->mParentEstateID; - param_message["region_id"] = im_info->mRegionID; - param_message["position"] = ll_sd_from_vector3(im_info->mPosition); - param_message["data"] = im_info->mData; - param_message["ttl"] = im_info->mTTL; - - LLSD param_agent; - param_agent["agent_id"] = im_info->mFromID; - - LLSD params; - params["version_params"] = param_version; - params["message_params"] = param_message; - params["agent_params"] = param_agent; - - return params; -} - -LLPointer<LLIMInfo> llsd_to_im_info(const LLSD& im_info_sd) -{ - LLSD param_message = im_info_sd["message_params"]; - LLSD param_agent = im_info_sd["agent_params"]; - - LLPointer<LLIMInfo> im_info = new LLIMInfo( - param_message["from_id"].asUUID(), - param_message["from_group"].asBoolean(), - param_message["to_id"].asUUID(), - (EInstantMessage) param_message["type"].asInteger(), - param_message["from_name"].asString(), - param_message["message"].asString(), - param_message["id"].asUUID(), - (U32) param_message["parent_estate_id"].asInteger(), - param_message["region_id"].asUUID(), - ll_vector3_from_sd(param_message["position"]), - param_message["data"], - (U8) param_message["offline"].asInteger(), - (U32) param_message["timestamp"].asInteger(), - param_message["ttl"].asInteger()); - - return im_info; -} - -LLPointer<LLIMInfo> LLIMInfo::clone() -{ - return new LLIMInfo( - mFromID, - mFromGroup, - mToID, - mIMType, - mName, - mMessage, - mID, - mParentEstateID, - mRegionID, - mPosition, - mData, - mOffline, - mTimeStamp, - mTTL); -} diff --git a/indra/llmessage/llinstantmessage.h b/indra/llmessage/llinstantmessage.h index f7118f8ccf..55cda15405 100644 --- a/indra/llmessage/llinstantmessage.h +++ b/indra/llmessage/llinstantmessage.h @@ -177,59 +177,6 @@ extern const std::string INTERACTIVE_SYSTEM_FROM; // Number of retry attempts on sending the im. extern const S32 IM_TTL; - -class LLIMInfo : public LLRefCount -{ -protected: - LLIMInfo(); - ~LLIMInfo(); - -public: - LLIMInfo(LLMessageSystem* msg, - S32 ttl = IM_TTL); - - LLIMInfo( - const LLUUID& from_id, - BOOL from_group, - const LLUUID& to_id, - EInstantMessage im_type, - const std::string& name, - const std::string& message, - const LLUUID& id, - U32 parent_estate_id, - const LLUUID& region_id, - const LLVector3& position, - LLSD data, - U8 offline, - U32 timestamp, - S32 ttl = IM_TTL); - - void packInstantMessage(LLMessageSystem* msg) const; - void packMessageBlock(LLMessageSystem* msg) const; - void unpackMessageBlock(LLMessageSystem* msg); - LLPointer<LLIMInfo> clone(); -public: - LLUUID mFromID; - BOOL mFromGroup; - LLUUID mToID; - U32 mParentEstateID; - LLUUID mRegionID; - LLVector3 mPosition; - U8 mOffline; - bool mViewerThinksToIsOnline; - EInstantMessage mIMType; - LLUUID mID; - U32 mTimeStamp; - std::string mName; - std::string mMessage; - LLSD mData; - - S32 mTTL; -}; - -LLPointer<LLIMInfo> llsd_to_im_info(const LLSD& im_info_sd); -LLSD im_info_to_llsd(LLPointer<LLIMInfo> im_info); - void pack_instant_message( LLMessageSystem* msgsystem, const LLUUID& from_id, diff --git a/indra/llmessage/llxfer.cpp b/indra/llmessage/llxfer.cpp index e0590dfdff..c8b9d5d19f 100644 --- a/indra/llmessage/llxfer.cpp +++ b/indra/llmessage/llxfer.cpp @@ -63,7 +63,6 @@ void LLXfer::init (S32 chunk_size) mXferSize = 0; mStatus = e_LL_XFER_UNINITIALIZED; - mNext = NULL; mWaitingForACK = FALSE; mCallback = NULL; @@ -99,7 +98,22 @@ void LLXfer::cleanup () S32 LLXfer::startSend (U64 xfer_id, const LLHost &remote_host) { - LL_WARNS() << "undifferentiated LLXfer::startSend for " << getFileName() << LL_ENDL; + LL_WARNS("Xfer") << "unexpected call to base class LLXfer::startSend for " << getFileName() << LL_ENDL; + return (-1); +} + +/////////////////////////////////////////////////////////// + +void LLXfer::closeFileHandle() +{ + LL_WARNS("Xfer") << "unexpected call to base class LLXfer::closeFileHandle for " << getFileName() << LL_ENDL; +} + +/////////////////////////////////////////////////////////// + +S32 LLXfer::reopenFileHandle() +{ + LL_WARNS("Xfer") << "unexpected call to base class LLXfer::reopenFileHandle for " << getFileName() << LL_ENDL; return (-1); } @@ -115,7 +129,7 @@ void LLXfer::setXferSize (S32 xfer_size) S32 LLXfer::startDownload() { - LL_WARNS() << "undifferentiated LLXfer::startDownload for " << getFileName() + LL_WARNS("Xfer") << "undifferentiated LLXfer::startDownload for " << getFileName() << LL_ENDL; return (-1); } @@ -127,20 +141,20 @@ S32 LLXfer::receiveData (char *datap, S32 data_size) S32 retval = 0; if (((S32) mBufferLength + data_size) > getMaxBufferSize()) - { + { // Write existing data to disk if it's larger than the buffer size retval = flush(); } if (!retval) { if (datap != NULL) - { + { // Append new data to mBuffer memcpy(&mBuffer[mBufferLength],datap,data_size); /*Flawfinder: ignore*/ mBufferLength += data_size; } else { - LL_ERRS() << "NULL data passed in receiveData" << LL_ENDL; + LL_ERRS("Xfer") << "NULL data passed in receiveData" << LL_ENDL; } } @@ -163,7 +177,7 @@ S32 LLXfer::flush() S32 LLXfer::suck(S32 start_position) { - LL_WARNS() << "Attempted to send a packet outside the buffer bounds in LLXfer::suck()" << LL_ENDL; + LL_WARNS("Xfer") << "Attempted to send a packet outside the buffer bounds in LLXfer::suck()" << LL_ENDL; return (-1); } @@ -196,7 +210,7 @@ void LLXfer::sendPacket(S32 packet_num) if (fdata_size < 0) { - LL_WARNS() << "negative data size in xfer send, aborting" << LL_ENDL; + LL_WARNS("Xfer") << "negative data size in xfer send, aborting" << LL_ENDL; abort(LL_ERR_EOF); return; } @@ -248,7 +262,12 @@ void LLXfer::sendPacket(S32 packet_num) gMessageSystem->nextBlockFast(_PREHASH_DataPacket); gMessageSystem->addBinaryDataFast(_PREHASH_Data, &fdata_buf,fdata_size); - gMessageSystem->sendMessage(mRemoteHost); + S32 sent_something = gMessageSystem->sendMessage(mRemoteHost); + if (sent_something == 0) + { + abort(LL_ERR_CIRCUIT_GONE); + return; + } ACKTimer.reset(); mWaitingForACK = TRUE; @@ -289,12 +308,12 @@ S32 LLXfer::processEOF() if (LL_ERR_NOERR == mCallbackResult) { - LL_INFOS() << "xfer from " << mRemoteHost << " complete: " << getFileName() + LL_INFOS("Xfer") << "xfer from " << mRemoteHost << " complete: " << getFileName() << LL_ENDL; } else { - LL_INFOS() << "xfer from " << mRemoteHost << " failed or aborted, code " + LL_INFOS("Xfer") << "xfer from " << mRemoteHost << " failed, code " << mCallbackResult << ": " << getFileName() << LL_ENDL; } @@ -323,15 +342,18 @@ void LLXfer::abort (S32 result_code) { mCallbackResult = result_code; - LL_INFOS() << "Aborting xfer from " << mRemoteHost << " named " << getFileName() + LL_INFOS("Xfer") << "Aborting xfer from " << mRemoteHost << " named " << getFileName() << " - error: " << result_code << LL_ENDL; - gMessageSystem->newMessageFast(_PREHASH_AbortXfer); - gMessageSystem->nextBlockFast(_PREHASH_XferID); - gMessageSystem->addU64Fast(_PREHASH_ID, mID); - gMessageSystem->addS32Fast(_PREHASH_Result, result_code); - - gMessageSystem->sendMessage(mRemoteHost); + if (result_code != LL_ERR_CIRCUIT_GONE) + { + gMessageSystem->newMessageFast(_PREHASH_AbortXfer); + gMessageSystem->nextBlockFast(_PREHASH_XferID); + gMessageSystem->addU64Fast(_PREHASH_ID, mID); + gMessageSystem->addS32Fast(_PREHASH_Result, result_code); + + gMessageSystem->sendMessage(mRemoteHost); + } mStatus = e_LL_XFER_ABORTED; } diff --git a/indra/llmessage/llxfer.h b/indra/llmessage/llxfer.h index edf5eeb82d..a906674dec 100644 --- a/indra/llmessage/llxfer.h +++ b/indra/llmessage/llxfer.h @@ -54,7 +54,6 @@ class LLXfer S32 mChunkSize; public: - LLXfer *mNext; U64 mID; S32 mPacketNum; @@ -62,7 +61,7 @@ class LLXfer S32 mXferSize; char *mBuffer; - U32 mBufferLength; + U32 mBufferLength; // Size of valid data, not actual allocated buffer size U32 mBufferStartOffset; BOOL mBufferContainsEOF; @@ -90,7 +89,9 @@ class LLXfer void init(S32 chunk_size); virtual void cleanup(); - virtual S32 startSend (U64 xfer_id, const LLHost &remote_host); + virtual S32 startSend(U64 xfer_id, const LLHost &remote_host); + virtual void closeFileHandle(); + virtual S32 reopenFileHandle(); virtual void sendPacket(S32 packet_num); virtual void sendNextPacket(); virtual void resendLastPacket(); diff --git a/indra/llmessage/llxfer_file.cpp b/indra/llmessage/llxfer_file.cpp index 8e2ed890e7..7fd4222fb7 100644 --- a/indra/llmessage/llxfer_file.cpp +++ b/indra/llmessage/llxfer_file.cpp @@ -102,12 +102,12 @@ void LLXfer_File::cleanup () if (mDeleteLocalOnCompletion) { - LL_DEBUGS() << "Removing file: " << mLocalFilename << LL_ENDL; + LL_DEBUGS("Xfer") << "Removing file: " << mLocalFilename << LL_ENDL; LLFile::remove(mLocalFilename, ENOENT); } else { - LL_DEBUGS() << "Keeping local file: " << mLocalFilename << LL_ENDL; + LL_DEBUGS("Xfer") << "Keeping local file: " << mLocalFilename << LL_ENDL; } LLXfer::cleanup(); @@ -139,7 +139,7 @@ S32 LLXfer_File::initializeRequest(U64 xfer_id, mCallbackDataHandle = user_data; mCallbackResult = LL_ERR_NOERR; - LL_INFOS() << "Requesting xfer from " << remote_host << " for file: " << mLocalFilename << LL_ENDL; + LL_INFOS("Xfer") << "Requesting xfer from " << remote_host << " for file: " << mLocalFilename << LL_ENDL; if (mBuffer) { @@ -167,6 +167,7 @@ S32 LLXfer_File::startDownload() fclose(mFp); mFp = NULL; + // tbd - is it premature to send this message if the queue is backed up? gMessageSystem->newMessageFast(_PREHASH_RequestXfer); gMessageSystem->nextBlockFast(_PREHASH_XferID); gMessageSystem->addU64Fast(_PREHASH_ID, mID); @@ -182,7 +183,7 @@ S32 LLXfer_File::startDownload() } else { - LL_WARNS() << "Couldn't create file to be received!" << LL_ENDL; + LL_WARNS("Xfer") << "Couldn't create file to be received!" << LL_ENDL; retval = -1; } @@ -206,7 +207,8 @@ S32 LLXfer_File::startSend (U64 xfer_id, const LLHost &remote_host) mBufferLength = 0; mBufferStartOffset = 0; - + + // We leave the file open, assuming we'll start reading and sending soon mFp = LLFile::fopen(mLocalFilename,"rb"); /* Flawfinder : ignore */ if (mFp) { @@ -223,7 +225,7 @@ S32 LLXfer_File::startSend (U64 xfer_id, const LLHost &remote_host) } else { - LL_INFOS() << "Warning: " << mLocalFilename << " not found." << LL_ENDL; + LL_INFOS("Xfer") << "Warning: " << mLocalFilename << " not found." << LL_ENDL; return (LL_ERR_FILE_NOT_FOUND); } @@ -233,6 +235,36 @@ S32 LLXfer_File::startSend (U64 xfer_id, const LLHost &remote_host) } /////////////////////////////////////////////////////////// +void LLXfer_File::closeFileHandle() +{ + if (mFp) + { + fclose(mFp); + mFp = NULL; + } +} + +/////////////////////////////////////////////////////////// + +S32 LLXfer_File::reopenFileHandle() +{ + S32 retval = LL_ERR_NOERR; // presume success + + if (mFp == NULL) + { + mFp = LLFile::fopen(mLocalFilename,"rb"); /* Flawfinder : ignore */ + if (mFp == NULL) + { + LL_INFOS("Xfer") << "Warning: " << mLocalFilename << " not found when re-opening file" << LL_ENDL; + retval = LL_ERR_FILE_NOT_FOUND; + } + } + + return retval; +} + + +/////////////////////////////////////////////////////////// S32 LLXfer_File::getMaxBufferSize () { @@ -279,18 +311,21 @@ S32 LLXfer_File::flush() { if (mFp) { - LL_ERRS() << "Overwriting open file pointer!" << LL_ENDL; + LL_ERRS("Xfer") << "Overwriting open file pointer!" << LL_ENDL; } mFp = LLFile::fopen(mTempFilename,"a+b"); /* Flawfinder : ignore */ if (mFp) { - if (fwrite(mBuffer,1,mBufferLength,mFp) != mBufferLength) + S32 write_size = fwrite(mBuffer,1,mBufferLength,mFp); + if (write_size != mBufferLength) { - LL_WARNS() << "Short write" << LL_ENDL; + LL_WARNS("Xfer") << "Non-matching write size, requested " << mBufferLength + << " but wrote " << write_size + << LL_ENDL; } -// LL_INFOS() << "******* wrote " << mBufferLength << " bytes of file xfer" << LL_ENDL; +// LL_INFOS("Xfer") << "******* wrote " << mBufferLength << " bytes of file xfer" << LL_ENDL; fclose(mFp); mFp = NULL; @@ -298,7 +333,7 @@ S32 LLXfer_File::flush() } else { - LL_WARNS() << "LLXfer_File::flush() unable to open " << mTempFilename << " for writing!" << LL_ENDL; + LL_WARNS("Xfer") << "LLXfer_File::flush() unable to open " << mTempFilename << " for writing!" << LL_ENDL; retval = LL_ERR_CANNOT_OPEN_FILE; } } @@ -329,18 +364,18 @@ S32 LLXfer_File::processEOF() { #if !LL_WINDOWS S32 error_number = errno; - LL_INFOS() << "Rename failure (" << error_number << ") - " + LL_INFOS("Xfer") << "Rename failure (" << error_number << ") - " << mTempFilename << " to " << mLocalFilename << LL_ENDL; if(EXDEV == error_number) { if(copy_file(mTempFilename, mLocalFilename) == 0) { - LL_INFOS() << "Rename across mounts; copying+unlinking the file instead." << LL_ENDL; + LL_INFOS("Xfer") << "Rename across mounts; copying+unlinking the file instead." << LL_ENDL; unlink(mTempFilename.c_str()); } else { - LL_WARNS() << "Copy failure - " << mTempFilename << " to " + LL_WARNS("Xfer") << "Copy failure - " << mTempFilename << " to " << mLocalFilename << LL_ENDL; } } @@ -354,11 +389,11 @@ S32 LLXfer_File::processEOF() //LL_WARNS() << "File " << mLocalFilename << " does " // << (!fp ? "not" : "" ) << " exit." << LL_ENDL; //if(fp) fclose(fp); - LL_WARNS() << "Rename fatally failed, can only handle EXDEV (" + LL_WARNS("Xfer") << "Rename fatally failed, can only handle EXDEV (" << EXDEV << ")" << LL_ENDL; } #else - LL_WARNS() << "Rename failure - " << mTempFilename << " to " + LL_WARNS("Xfer") << "Rename failure - " << mTempFilename << " to " << mLocalFilename << LL_ENDL; #endif } @@ -437,3 +472,4 @@ S32 copy_file(const std::string& from, const std::string& to) return rv; } #endif + diff --git a/indra/llmessage/llxfer_file.h b/indra/llmessage/llxfer_file.h index a37dda6732..ab9374549e 100644 --- a/indra/llmessage/llxfer_file.h +++ b/indra/llmessage/llxfer_file.h @@ -61,8 +61,10 @@ class LLXfer_File : public LLXfer virtual S32 startDownload(); virtual S32 processEOF(); - - virtual S32 startSend (U64 xfer_id, const LLHost &remote_host); + + virtual S32 startSend(U64 xfer_id, const LLHost &remote_host); + virtual void closeFileHandle(); + virtual S32 reopenFileHandle(); virtual S32 suck(S32 start_position); virtual S32 flush(); diff --git a/indra/llmessage/llxfer_mem.cpp b/indra/llmessage/llxfer_mem.cpp index 3bea08f2e5..78a3e4f558 100644 --- a/indra/llmessage/llxfer_mem.cpp +++ b/indra/llmessage/llxfer_mem.cpp @@ -80,28 +80,6 @@ void LLXfer_Mem::setXferSize (S32 xfer_size) /////////////////////////////////////////////////////////// -U64 LLXfer_Mem::registerXfer(U64 xfer_id, const void *datap, const S32 length) -{ - mID = xfer_id; - - if (datap) - { - setXferSize(length); - if (mBuffer) - { - memcpy(mBuffer,datap,length); /* Flawfinder : ignore */ - mBufferLength = length; - } - else - { - xfer_id = 0; - } - } - - mStatus = e_LL_XFER_REGISTERED; - return (xfer_id); -} - S32 LLXfer_Mem::startSend (U64 xfer_id, const LLHost &remote_host) { S32 retval = LL_ERR_NOERR; // presume success diff --git a/indra/llmessage/llxfer_mem.h b/indra/llmessage/llxfer_mem.h index b5adf837df..d07779de87 100644 --- a/indra/llmessage/llxfer_mem.h +++ b/indra/llmessage/llxfer_mem.h @@ -53,7 +53,6 @@ class LLXfer_Mem : public LLXfer virtual void cleanup(); virtual S32 startSend (U64 xfer_id, const LLHost &remote_host); - virtual U64 registerXfer(U64 xfer_id, const void *datap, const S32 length); virtual void setXferSize (S32 data_size); virtual S32 initializeRequest(U64 xfer_id, diff --git a/indra/llmessage/llxfer_vfile.cpp b/indra/llmessage/llxfer_vfile.cpp index 4a378d1d34..ddc24342f6 100644 --- a/indra/llmessage/llxfer_vfile.cpp +++ b/indra/llmessage/llxfer_vfile.cpp @@ -79,8 +79,20 @@ void LLXfer_VFile::init (LLVFS *vfs, const LLUUID &local_id, LLAssetType::EType void LLXfer_VFile::cleanup () { - LLVFile file(mVFS, mTempID, mType, LLVFile::WRITE); - file.remove(); + if (mTempID.notNull() && + mDeleteTempFile) + { + if (mVFS->getExists(mTempID, mType)) + { + LLVFile file(mVFS, mTempID, mType, LLVFile::WRITE); + file.remove(); + } + else + { + LL_WARNS("Xfer") << "LLXfer_VFile::cleanup() can't open to delete VFS file " << mTempID << "." << LLAssetType::lookup(mType) + << ", mRemoteID is " << mRemoteID << LL_ENDL; + } + } delete mVFile; mVFile = NULL; @@ -118,7 +130,7 @@ S32 LLXfer_VFile::initializeRequest(U64 xfer_id, mName = llformat("VFile %s:%s", id_string.c_str(), LLAssetType::lookup(mType)); - LL_INFOS() << "Requesting " << mName << LL_ENDL; + LL_INFOS("Xfer") << "Requesting " << mName << LL_ENDL; if (mBuffer) { @@ -131,6 +143,7 @@ S32 LLXfer_VFile::initializeRequest(U64 xfer_id, mBufferLength = 0; mPacketNum = 0; mTempID.generate(); + mDeleteTempFile = TRUE; mStatus = e_LL_XFER_PENDING; return retval; } @@ -140,7 +153,8 @@ S32 LLXfer_VFile::initializeRequest(U64 xfer_id, S32 LLXfer_VFile::startDownload() { S32 retval = 0; // presume success - LLVFile file(mVFS, mTempID, mType, LLVFile::APPEND); + + // Don't need to create the file here, it will happen when data arrives gMessageSystem->newMessageFast(_PREHASH_RequestXfer); gMessageSystem->nextBlockFast(_PREHASH_XferID); @@ -184,6 +198,8 @@ S32 LLXfer_VFile::startSend (U64 xfer_id, const LLHost &remote_host) if (mVFile->getSize() <= 0) { + LL_WARNS("Xfer") << "LLXfer_VFile::startSend() VFS file " << mLocalID << "." << LLAssetType::lookup(mType) + << " has unexpected file size of " << mVFile->getSize() << LL_ENDL; delete mVFile; mVFile = NULL; @@ -198,6 +214,7 @@ S32 LLXfer_VFile::startSend (U64 xfer_id, const LLHost &remote_host) } else { + LL_WARNS("Xfer") << "LLXfer_VFile::startSend() can't read VFS file " << mLocalID << "." << LLAssetType::lookup(mType) << LL_ENDL; retval = LL_ERR_FILE_NOT_FOUND; } @@ -205,6 +222,41 @@ S32 LLXfer_VFile::startSend (U64 xfer_id, const LLHost &remote_host) } /////////////////////////////////////////////////////////// + +void LLXfer_VFile::closeFileHandle() +{ + if (mVFile) + { + delete mVFile; + mVFile = NULL; + } +} + +/////////////////////////////////////////////////////////// + +S32 LLXfer_VFile::reopenFileHandle() +{ + S32 retval = LL_ERR_NOERR; // presume success + + if (mVFile == NULL) + { + if (mVFS->getExists(mLocalID, mType)) + { + mVFile = new LLVFile(mVFS, mLocalID, mType, LLVFile::READ); + } + else + { + LL_WARNS("Xfer") << "LLXfer_VFile::reopenFileHandle() can't read VFS file " << mLocalID << "." << LLAssetType::lookup(mType) << LL_ENDL; + retval = LL_ERR_FILE_NOT_FOUND; + } + } + + return retval; +} + + +/////////////////////////////////////////////////////////// + void LLXfer_VFile::setXferSize (S32 xfer_size) { LLXfer::setXferSize(xfer_size); @@ -236,8 +288,8 @@ S32 LLXfer_VFile::suck(S32 start_position) // grab a buffer from the right place in the file if (! mVFile->seek(start_position, 0)) { - LL_WARNS() << "VFile Xfer Can't seek to position " << start_position << ", file length " << mVFile->getSize() << LL_ENDL; - LL_WARNS() << "While sending file " << mLocalID << LL_ENDL; + LL_WARNS("Xfer") << "VFile Xfer Can't seek to position " << start_position << ", file length " << mVFile->getSize() << LL_ENDL; + LL_WARNS("Xfer") << "While sending file " << mLocalID << LL_ENDL; return -1; } @@ -288,12 +340,31 @@ S32 LLXfer_VFile::processEOF() if (!mCallbackResult) { - LLVFile file(mVFS, mTempID, mType, LLVFile::WRITE); - if (! file.rename(mLocalID, mType)) + if (mVFS->getExists(mTempID, mType)) { - LL_INFOS() << "copy from temp file failed: unable to rename to " << mLocalID << LL_ENDL; + LLVFile file(mVFS, mTempID, mType, LLVFile::WRITE); + if (!file.rename(mLocalID, mType)) + { + LL_WARNS("Xfer") << "VFS rename of temp file failed: unable to rename " << mTempID << " to " << mLocalID << LL_ENDL; + } + else + { + #ifdef VFS_SPAM + // Debugging spam + LL_INFOS("Xfer") << "VFS rename of temp file done: renamed " << mTempID << " to " << mLocalID + << " LLVFile size is " << file.getSize() + << LL_ENDL; + #endif + + // Rename worked: the original file is gone. Clear mDeleteTempFile + // so we don't attempt to delete the file in cleanup() + mDeleteTempFile = FALSE; + } + } + else + { + LL_WARNS("Xfer") << "LLXfer_VFile::processEOF() can't open for renaming VFS file " << mTempID << "." << LLAssetType::lookup(mType) << LL_ENDL; } - } if (mVFile) @@ -336,3 +407,4 @@ U32 LLXfer_VFile::getXferTypeTag() { return LLXfer::XFER_VFILE; } + diff --git a/indra/llmessage/llxfer_vfile.h b/indra/llmessage/llxfer_vfile.h index 048bf49dcc..5bf9a5cfba 100644 --- a/indra/llmessage/llxfer_vfile.h +++ b/indra/llmessage/llxfer_vfile.h @@ -47,6 +47,8 @@ class LLXfer_VFile : public LLXfer std::string mName; + BOOL mDeleteTempFile; + public: LLXfer_VFile (); LLXfer_VFile (LLVFS *vfs, const LLUUID &local_id, LLAssetType::EType type); @@ -66,8 +68,10 @@ class LLXfer_VFile : public LLXfer virtual S32 startDownload(); virtual S32 processEOF(); - - virtual S32 startSend (U64 xfer_id, const LLHost &remote_host); + + virtual S32 startSend(U64 xfer_id, const LLHost &remote_host); + virtual void closeFileHandle(); + virtual S32 reopenFileHandle(); virtual S32 suck(S32 start_position); virtual S32 flush(); diff --git a/indra/llmessage/llxfermanager.cpp b/indra/llmessage/llxfermanager.cpp index 2ceb64ce8f..4cea886c8a 100644 --- a/indra/llmessage/llxfermanager.cpp +++ b/indra/llmessage/llxfermanager.cpp @@ -44,9 +44,15 @@ const S32 LL_PACKET_RETRY_LIMIT = 10; // packet retransmission limit const S32 LL_DEFAULT_MAX_SIMULTANEOUS_XFERS = 10; const S32 LL_DEFAULT_MAX_REQUEST_FIFO_XFERS = 1000; -#define LL_XFER_PROGRESS_MESSAGES 0 -#define LL_XFER_TEST_REXMIT 0 +// Kills the connection if a viewer download queue hits this many requests backed up +// Also set in simulator.xml at "hard_limit_outgoing_xfers_per_circuit" +const S32 LL_DEFAULT_MAX_HARD_LIMIT_SIMULTANEOUS_XFERS = 500; +// Use this to show sending some ConfirmXferPacket messages +//#define LL_XFER_PROGRESS_MESSAGES 1 + +// Use this for lots of diagnostic spam +//#define LL_XFER_DIAGNOISTIC_LOGGING 1 /////////////////////////////////////////////////////////// @@ -66,10 +72,10 @@ LLXferManager::~LLXferManager () void LLXferManager::init (LLVFS *vfs) { - mSendList = NULL; - mReceiveList = NULL; + cleanup(); setMaxOutgoingXfersPerCircuit(LL_DEFAULT_MAX_SIMULTANEOUS_XFERS); + setHardLimitOutgoingXfersPerCircuit(LL_DEFAULT_MAX_HARD_LIMIT_SIMULTANEOUS_XFERS); setMaxIncomingXfers(LL_DEFAULT_MAX_REQUEST_FIFO_XFERS); mVFS = vfs; @@ -83,29 +89,14 @@ void LLXferManager::init (LLVFS *vfs) void LLXferManager::cleanup () { - LLXfer *xferp; - LLXfer *delp; - for_each(mOutgoingHosts.begin(), mOutgoingHosts.end(), DeletePointer()); mOutgoingHosts.clear(); - delp = mSendList; - while (delp) - { - xferp = delp->mNext; - delete delp; - delp = xferp; - } - mSendList = NULL; + for_each(mSendList.begin(), mSendList.end(), DeletePointer()); + mSendList.clear(); - delp = mReceiveList; - while (delp) - { - xferp = delp->mNext; - delete delp; - delp = xferp; - } - mReceiveList = NULL; + for_each(mReceiveList.begin(), mReceiveList.end(), DeletePointer()); + mReceiveList.clear(); } /////////////////////////////////////////////////////////// @@ -122,6 +113,11 @@ void LLXferManager::setMaxOutgoingXfersPerCircuit(S32 max_num) mMaxOutgoingXfersPerCircuit = max_num; } +void LLXferManager::setHardLimitOutgoingXfersPerCircuit(S32 max_num) +{ + mHardLimitOutgoingXfersPerCircuit = max_num; +} + void LLXferManager::setUseAckThrottling(const BOOL use) { mUseAckThrottling = use; @@ -140,6 +136,11 @@ void LLXferManager::setAckThrottleBPS(const F32 bps) F32 actual_rate = llmax(min_bps*1.1f, bps); LL_DEBUGS("AppInit") << "LLXferManager ack throttle min rate: " << min_bps << LL_ENDL; LL_DEBUGS("AppInit") << "LLXferManager ack throttle actual rate: " << actual_rate << LL_ENDL; + #ifdef LL_XFER_DIAGNOISTIC_LOGGING + LL_INFOS("Xfer") << "LLXferManager ack throttle min rate: " << min_bps << LL_ENDL; + LL_INFOS("Xfer") << "LLXferManager ack throttle actual rate: " << actual_rate << LL_ENDL; + #endif // LL_XFER_DIAGNOISTIC_LOGGING + mAckThrottle.setRate(actual_rate); } @@ -148,45 +149,71 @@ void LLXferManager::setAckThrottleBPS(const F32 bps) void LLXferManager::updateHostStatus() { - LLXfer *xferp; - LLHostStatus *host_statusp = NULL; - + // Clear the outgoing host list for_each(mOutgoingHosts.begin(), mOutgoingHosts.end(), DeletePointer()); mOutgoingHosts.clear(); - - for (xferp = mSendList; xferp; xferp = xferp->mNext) + + // Loop through all outgoing xfers and re-build mOutgoingHosts + for (xfer_list_t::iterator send_iter = mSendList.begin(); + send_iter != mSendList.end(); ++send_iter) { + LLHostStatus *host_statusp = NULL; for (status_list_t::iterator iter = mOutgoingHosts.begin(); iter != mOutgoingHosts.end(); ++iter) { - host_statusp = *iter; - if (host_statusp->mHost == xferp->mRemoteHost) - { + if ((*iter)->mHost == (*send_iter)->mRemoteHost) + { // Already have this host + host_statusp = *iter; break; } } if (!host_statusp) - { + { // Don't have this host, so add it host_statusp = new LLHostStatus(); if (host_statusp) { - host_statusp->mHost = xferp->mRemoteHost; + host_statusp->mHost = (*send_iter)->mRemoteHost; mOutgoingHosts.push_front(host_statusp); } } if (host_statusp) - { - if (xferp->mStatus == e_LL_XFER_PENDING) + { // Do the accounting + if ((*send_iter)->mStatus == e_LL_XFER_PENDING) { host_statusp->mNumPending++; } - else if (xferp->mStatus == e_LL_XFER_IN_PROGRESS) + else if ((*send_iter)->mStatus == e_LL_XFER_IN_PROGRESS) { host_statusp->mNumActive++; } } - } + +#ifdef LL_XFER_DIAGNOISTIC_LOGGING + for (xfer_list_t::iterator send_iter = mSendList.begin(); + send_iter != mSendList.end(); ++send_iter) + { + LLXfer * xferp = *send_iter; + LL_INFOS("Xfer") << "xfer to host " << xferp->mRemoteHost + << " is " << xferp->mXferSize << " bytes" + << ", status " << (S32)(xferp->mStatus) + << ", waiting for ACK: " << (S32)(xferp->mWaitingForACK) + << " in frame " << (S32) LLFrameTimer::getFrameCount() + << LL_ENDL; + } + + for (status_list_t::iterator iter = mOutgoingHosts.begin(); + iter != mOutgoingHosts.end(); ++iter) + { + LL_INFOS("Xfer") << "LLXfer host " << (*iter)->mHost.getIPandPort() + << " has " << (*iter)->mNumActive + << " active, " << (*iter)->mNumPending + << " pending" + << " in frame " << (S32) LLFrameTimer::getFrameCount() + << LL_ENDL; + } +#endif // LL_XFER_DIAGNOISTIC_LOGGING + } /////////////////////////////////////////////////////////// @@ -196,27 +223,28 @@ void LLXferManager::printHostStatus() LLHostStatus *host_statusp = NULL; if (!mOutgoingHosts.empty()) { - LL_INFOS() << "Outgoing Xfers:" << LL_ENDL; + LL_INFOS("Xfer") << "Outgoing Xfers:" << LL_ENDL; for (status_list_t::iterator iter = mOutgoingHosts.begin(); iter != mOutgoingHosts.end(); ++iter) { host_statusp = *iter; - LL_INFOS() << " " << host_statusp->mHost << " active: " << host_statusp->mNumActive << " pending: " << host_statusp->mNumPending << LL_ENDL; + LL_INFOS("Xfer") << " " << host_statusp->mHost << " active: " << host_statusp->mNumActive << " pending: " << host_statusp->mNumPending << LL_ENDL; } } } /////////////////////////////////////////////////////////// -LLXfer *LLXferManager::findXfer (U64 id, LLXfer *list_head) +LLXfer * LLXferManager::findXferByID(U64 id, xfer_list_t & xfer_list) { - LLXfer *xferp; - for (xferp = list_head; xferp; xferp = xferp->mNext) + for (xfer_list_t::iterator iter = xfer_list.begin(); + iter != xfer_list.end(); + ++iter) { - if (xferp->mID == id) + if ((*iter)->mID == id) { - return(xferp); + return(*iter); } } return(NULL); @@ -225,29 +253,34 @@ LLXfer *LLXferManager::findXfer (U64 id, LLXfer *list_head) /////////////////////////////////////////////////////////// -void LLXferManager::removeXfer (LLXfer *delp, LLXfer **list_head) +// WARNING: this invalidates iterators from xfer_list +void LLXferManager::removeXfer(LLXfer *delp, xfer_list_t & xfer_list) { - // This function assumes that delp will only occur in the list - // zero or one times. if (delp) - { - if (*list_head == delp) + { + std::string direction = "send"; + if (&xfer_list == &mReceiveList) { - *list_head = delp->mNext; - delete (delp); + direction = "receive"; } - else + + // This assumes that delp will occur in the list once at most + // Find the pointer in the list + for (xfer_list_t::iterator iter = xfer_list.begin(); + iter != xfer_list.end(); + ++iter) { - LLXfer *xferp = *list_head; - while (xferp->mNext) + if ((*iter) == delp) { - if (xferp->mNext == delp) - { - xferp->mNext = delp->mNext; - delete (delp); - break; - } - xferp = xferp->mNext; + LL_DEBUGS("Xfer") << "Deleting xfer to host " << (*iter)->mRemoteHost + << " of " << (*iter)->mXferSize << " bytes" + << ", status " << (S32)((*iter)->mStatus) + << " from the " << direction << " list" + << LL_ENDL; + + xfer_list.erase(iter); + delete (delp); + break; } } } @@ -255,35 +288,30 @@ void LLXferManager::removeXfer (LLXfer *delp, LLXfer **list_head) /////////////////////////////////////////////////////////// -U32 LLXferManager::numActiveListEntries(LLXfer *list_head) +LLHostStatus * LLXferManager::findHostStatus(const LLHost &host) { - U32 num_entries = 0; + LLHostStatus *host_statusp = NULL; - while (list_head) + for (status_list_t::iterator iter = mOutgoingHosts.begin(); + iter != mOutgoingHosts.end(); ++iter) { - if (list_head->mStatus == e_LL_XFER_IN_PROGRESS) + host_statusp = *iter; + if (host_statusp->mHost == host) { - num_entries++; + return (host_statusp); } - list_head = list_head->mNext; } - return(num_entries); + return 0; } /////////////////////////////////////////////////////////// - + S32 LLXferManager::numPendingXfers(const LLHost &host) { - LLHostStatus *host_statusp = NULL; - - for (status_list_t::iterator iter = mOutgoingHosts.begin(); - iter != mOutgoingHosts.end(); ++iter) + LLHostStatus *host_statusp = findHostStatus(host); + if (host_statusp) { - host_statusp = *iter; - if (host_statusp->mHost == host) - { - return (host_statusp->mNumPending); - } + return host_statusp->mNumPending; } return 0; } @@ -292,16 +320,10 @@ S32 LLXferManager::numPendingXfers(const LLHost &host) S32 LLXferManager::numActiveXfers(const LLHost &host) { - LLHostStatus *host_statusp = NULL; - - for (status_list_t::iterator iter = mOutgoingHosts.begin(); - iter != mOutgoingHosts.end(); ++iter) + LLHostStatus *host_statusp = findHostStatus(host); + if (host_statusp) { - host_statusp = *iter; - if (host_statusp->mHost == host) - { - return (host_statusp->mNumActive); - } + return host_statusp->mNumActive; } return 0; } @@ -372,35 +394,6 @@ BOOL LLXferManager::isLastPacket(S32 packet_num) /////////////////////////////////////////////////////////// -U64 LLXferManager::registerXfer(const void *datap, const S32 length) -{ - LLXfer *xferp; - U64 xfer_id = getNextID(); - - xferp = (LLXfer *) new LLXfer_Mem(); - if (xferp) - { - xferp->mNext = mSendList; - mSendList = xferp; - - xfer_id = ((LLXfer_Mem *)xferp)->registerXfer(xfer_id, datap,length); - - if (!xfer_id) - { - removeXfer(xferp,&mSendList); - } - } - else - { - LL_ERRS() << "Xfer allocation error" << LL_ENDL; - xfer_id = 0; - } - - return(xfer_id); -} - -/////////////////////////////////////////////////////////// - U64 LLXferManager::requestFile(const std::string& local_filename, const std::string& remote_filename, ELLPath remote_path, @@ -411,43 +404,46 @@ U64 LLXferManager::requestFile(const std::string& local_filename, BOOL is_priority, BOOL use_big_packets) { - LLXfer *xferp; + LLXfer_File* file_xfer_p = NULL; - for (xferp = mReceiveList; xferp ; xferp = xferp->mNext) + // First check to see if it's already requested + for (xfer_list_t::iterator iter = mReceiveList.begin(); + iter != mReceiveList.end(); ++iter) { - if (xferp->getXferTypeTag() == LLXfer::XFER_FILE - && (((LLXfer_File*)xferp)->matchesLocalFilename(local_filename)) - && (((LLXfer_File*)xferp)->matchesRemoteFilename(remote_filename, remote_path)) - && (remote_host == xferp->mRemoteHost) - && (callback == xferp->mCallback) - && (user_data == xferp->mCallbackDataHandle)) - + if ((*iter)->getXferTypeTag() == LLXfer::XFER_FILE) { - // cout << "requested a xfer already in progress" << endl; - return xferp->mID; + file_xfer_p = (LLXfer_File*)(*iter); + if (file_xfer_p->matchesLocalFilename(local_filename) + && file_xfer_p->matchesRemoteFilename(remote_filename, remote_path) + && (remote_host == file_xfer_p->mRemoteHost) + && (callback == file_xfer_p->mCallback) + && (user_data == file_xfer_p->mCallbackDataHandle)) + { + // Already have the request (already in progress) + return (*iter)->mID; + } } } U64 xfer_id = 0; S32 chunk_size = use_big_packets ? LL_XFER_LARGE_PAYLOAD : -1; - xferp = (LLXfer *) new LLXfer_File(chunk_size); - if (xferp) + file_xfer_p = new LLXfer_File(chunk_size); + if (file_xfer_p) { - addToList(xferp, mReceiveList, is_priority); + addToList(file_xfer_p, mReceiveList, is_priority); // Remove any file by the same name that happens to be lying // around. // Note: according to AaronB, this is here to deal with locks on files that were // in transit during a crash, - if( delete_remote_on_completion - && (remote_filename.substr(remote_filename.length()-4) == ".tmp") - && gDirUtilp->fileExists(local_filename)) + if(delete_remote_on_completion && + (remote_filename.substr(remote_filename.length()-4) == ".tmp")) { LLFile::remove(local_filename, ENOENT); } xfer_id = getNextID(); - ((LLXfer_File *)xferp)->initializeRequest( + file_xfer_p->initializeRequest( xfer_id, local_filename, remote_filename, @@ -459,39 +455,11 @@ U64 LLXferManager::requestFile(const std::string& local_filename, } else { - LL_ERRS() << "Xfer allocation error" << LL_ENDL; + LL_ERRS("Xfer") << "Xfer allocation error" << LL_ENDL; } return xfer_id; } -void LLXferManager::requestFile(const std::string& remote_filename, - ELLPath remote_path, - const LLHost& remote_host, - BOOL delete_remote_on_completion, - void (*callback)(void*,S32,void**,S32,LLExtStat), - void** user_data, - BOOL is_priority) -{ - LLXfer *xferp; - - xferp = (LLXfer *) new LLXfer_Mem(); - if (xferp) - { - addToList(xferp, mReceiveList, is_priority); - ((LLXfer_Mem *)xferp)->initializeRequest(getNextID(), - remote_filename, - remote_path, - remote_host, - delete_remote_on_completion, - callback, user_data); - startPendingDownloads(); - } - else - { - LL_ERRS() << "Xfer allocation error" << LL_ENDL; - } -} - void LLXferManager::requestVFile(const LLUUID& local_id, const LLUUID& remote_id, LLAssetType::EType type, LLVFS* vfs, @@ -500,28 +468,46 @@ void LLXferManager::requestVFile(const LLUUID& local_id, void** user_data, BOOL is_priority) { - LLXfer *xferp; - - for (xferp = mReceiveList; xferp ; xferp = xferp->mNext) - { - if (xferp->getXferTypeTag() == LLXfer::XFER_VFILE - && (((LLXfer_VFile*)xferp)->matchesLocalFile(local_id, type)) - && (((LLXfer_VFile*)xferp)->matchesRemoteFile(remote_id, type)) - && (remote_host == xferp->mRemoteHost) - && (callback == xferp->mCallback) - && (user_data == xferp->mCallbackDataHandle)) + LLXfer_VFile * xfer_p = NULL; + for (xfer_list_t::iterator iter = mReceiveList.begin(); + iter != mReceiveList.end(); ++iter) + { // Find any matching existing requests + if ((*iter)->getXferTypeTag() == LLXfer::XFER_VFILE) { - // cout << "requested a xfer already in progress" << endl; - return; + xfer_p = (LLXfer_VFile*) (*iter); + if (xfer_p->matchesLocalFile(local_id, type) + && xfer_p->matchesRemoteFile(remote_id, type) + && (remote_host == xfer_p->mRemoteHost) + && (callback == xfer_p->mCallback) + && (user_data == xfer_p->mCallbackDataHandle)) + + { // Have match, don't add a duplicate + #ifdef LL_XFER_DIAGNOISTIC_LOGGING + LL_INFOS("Xfer") << "Dropping duplicate xfer request for " << remote_id + << " on " << remote_host.getIPandPort() + << " local id " << local_id + << LL_ENDL; + #endif // LL_XFER_DIAGNOISTIC_LOGGING + + return; + } } } - xferp = (LLXfer *) new LLXfer_VFile(); - if (xferp) + xfer_p = new LLXfer_VFile(); + if (xfer_p) { - addToList(xferp, mReceiveList, is_priority); - ((LLXfer_VFile *)xferp)->initializeRequest(getNextID(), + #ifdef LL_XFER_DIAGNOISTIC_LOGGING + LL_INFOS("Xfer") << "Starting file xfer for " << remote_id + << " type " << LLAssetType::lookupHumanReadable(type) + << " from " << xfer_p->mRemoteHost.getIPandPort() + << ", local id " << local_id + << LL_ENDL; + #endif // LL_XFER_DIAGNOISTIC_LOGGING + + addToList(xfer_p, mReceiveList, is_priority); + ((LLXfer_VFile *)xfer_p)->initializeRequest(getNextID(), vfs, local_id, remote_id, @@ -533,78 +519,18 @@ void LLXferManager::requestVFile(const LLUUID& local_id, } else { - LL_ERRS() << "Xfer allocation error" << LL_ENDL; - } - -} - -/* -void LLXferManager::requestXfer( - const std::string& local_filename, - BOOL delete_remote_on_completion, - U64 xfer_id, - const LLHost &remote_host, - void (*callback)(void **,S32), - void **user_data) -{ - LLXfer *xferp; - - for (xferp = mReceiveList; xferp ; xferp = xferp->mNext) - { - if (xferp->getXferTypeTag() == LLXfer::XFER_FILE - && (((LLXfer_File*)xferp)->matchesLocalFilename(local_filename)) - && (xfer_id == xferp->mID) - && (remote_host == xferp->mRemoteHost) - && (callback == xferp->mCallback) - && (user_data == xferp->mCallbackDataHandle)) - - { - // cout << "requested a xfer already in progress" << endl; - return; - } + LL_ERRS("Xfer") << "Xfer allocation error" << LL_ENDL; } - xferp = (LLXfer *) new LLXfer_File(); - if (xferp) - { - xferp->mNext = mReceiveList; - mReceiveList = xferp; - - ((LLXfer_File *)xferp)->initializeRequest(xfer_id,local_filename,"",LL_PATH_NONE,remote_host,delete_remote_on_completion,callback,user_data); - startPendingDownloads(); - } - else - { - LL_ERRS() << "Xfer allcoation error" << LL_ENDL; - } } -void LLXferManager::requestXfer(U64 xfer_id, const LLHost &remote_host, BOOL delete_remote_on_completion, void (*callback)(void *,S32,void **,S32),void **user_data) -{ - LLXfer *xferp; - - xferp = (LLXfer *) new LLXfer_Mem(); - if (xferp) - { - xferp->mNext = mReceiveList; - mReceiveList = xferp; - - ((LLXfer_Mem *)xferp)->initializeRequest(xfer_id,"",LL_PATH_NONE,remote_host,delete_remote_on_completion,callback,user_data); - startPendingDownloads(); - } - else - { - LL_ERRS() << "Xfer allcoation error" << LL_ENDL; - } -} -*/ /////////////////////////////////////////////////////////// void LLXferManager::processReceiveData (LLMessageSystem *mesgsys, void ** /*user_data*/) { // there's sometimes an extra 4 bytes added to an xfer payload const S32 BUF_SIZE = LL_XFER_LARGE_PAYLOAD + 4; - char fdata_buf[LL_XFER_LARGE_PAYLOAD + 4]; /* Flawfinder : ignore */ + char fdata_buf[BUF_SIZE]; /* Flawfinder : ignore */ S32 fdata_size; U64 id; S32 packetnum; @@ -614,14 +540,24 @@ void LLXferManager::processReceiveData (LLMessageSystem *mesgsys, void ** /*user mesgsys->getS32Fast(_PREHASH_XferID, _PREHASH_Packet, packetnum); fdata_size = mesgsys->getSizeFast(_PREHASH_DataPacket,_PREHASH_Data); - mesgsys->getBinaryDataFast(_PREHASH_DataPacket, _PREHASH_Data, fdata_buf, 0, 0, BUF_SIZE); - - xferp = findXfer(id, mReceiveList); + if (fdata_size < 0 || + fdata_size > BUF_SIZE) + { + char U64_BUF[MAX_STRING]; /* Flawfinder : ignore */ + LL_WARNS("Xfer") << "Received invalid xfer data size of " << fdata_size + << " in packet number " << packetnum + << " from " << mesgsys->getSender() + << " for xfer id: " << U64_to_str(id, U64_BUF, sizeof(U64_BUF)) + << LL_ENDL; + return; + } + mesgsys->getBinaryDataFast(_PREHASH_DataPacket, _PREHASH_Data, fdata_buf, fdata_size, 0, BUF_SIZE); - if (!xferp) + xferp = findXferByID(id, mReceiveList); + if (!xferp) { char U64_BUF[MAX_STRING]; /* Flawfinder : ignore */ - LL_INFOS() << "received xfer data from " << mesgsys->getSender() + LL_WARNS("Xfer") << "received xfer data from " << mesgsys->getSender() << " for non-existent xfer id: " << U64_to_str(id, U64_BUF, sizeof(U64_BUF)) << LL_ENDL; return; @@ -634,11 +570,11 @@ void LLXferManager::processReceiveData (LLMessageSystem *mesgsys, void ** /*user // confirm it if it was a resend of the last one, since the confirmation might have gotten dropped if (decodePacketNum(packetnum) == (xferp->mPacketNum - 1)) { - LL_INFOS() << "Reconfirming xfer " << xferp->mRemoteHost << ":" << xferp->getFileName() << " packet " << packetnum << LL_ENDL; sendConfirmPacket(mesgsys, id, decodePacketNum(packetnum), mesgsys->getSender()); + LL_INFOS("Xfer") << "Reconfirming xfer " << xferp->mRemoteHost << ":" << xferp->getFileName() << " packet " << packetnum << LL_ENDL; sendConfirmPacket(mesgsys, id, decodePacketNum(packetnum), mesgsys->getSender()); } else { - LL_INFOS() << "Ignoring xfer " << xferp->mRemoteHost << ":" << xferp->getFileName() << " recv'd packet " << packetnum << "; expecting " << xferp->mPacketNum << LL_ENDL; + LL_INFOS("Xfer") << "Ignoring xfer " << xferp->mRemoteHost << ":" << xferp->getFileName() << " recv'd packet " << packetnum << "; expecting " << xferp->mPacketNum << LL_ENDL; } return; } @@ -663,7 +599,7 @@ void LLXferManager::processReceiveData (LLMessageSystem *mesgsys, void ** /*user if (result == LL_ERR_CANNOT_OPEN_FILE) { xferp->abort(LL_ERR_CANNOT_OPEN_FILE); - removeXfer(xferp,&mReceiveList); + removeXfer(xferp,mReceiveList); startPendingDownloads(); return; } @@ -688,7 +624,7 @@ void LLXferManager::processReceiveData (LLMessageSystem *mesgsys, void ** /*user if (isLastPacket(packetnum)) { xferp->processEOF(); - removeXfer(xferp,&mReceiveList); + removeXfer(xferp,mReceiveList); startPendingDownloads(); } } @@ -697,7 +633,7 @@ void LLXferManager::processReceiveData (LLMessageSystem *mesgsys, void ** /*user void LLXferManager::sendConfirmPacket (LLMessageSystem *mesgsys, U64 id, S32 packetnum, const LLHost &remote_host) { -#if LL_XFER_PROGRESS_MESSAGES +#ifdef LL_XFER_PROGRESS_MESSAGES if (!(packetnum % 50)) { cout << "confirming xfer packet #" << packetnum << endl; @@ -708,6 +644,7 @@ void LLXferManager::sendConfirmPacket (LLMessageSystem *mesgsys, U64 id, S32 pac mesgsys->addU64Fast(_PREHASH_ID, id); mesgsys->addU32Fast(_PREHASH_Packet, packetnum); + // Ignore a circuit failure here, we'll catch it with another message mesgsys->sendMessage(remote_host); } @@ -746,6 +683,28 @@ bool LLXferManager::validateFileForTransfer(const std::string& filename) return find_and_remove(mExpectedTransfers, filename); } +/* Present in fireengine, not used by viewer +void LLXferManager::expectVFileForRequest(const std::string& filename) +{ + mExpectedVFileRequests.insert(filename); +} + +bool LLXferManager::validateVFileForRequest(const std::string& filename) +{ + return find_and_remove(mExpectedVFileRequests, filename); +} + +void LLXferManager::expectVFileForTransfer(const std::string& filename) +{ + mExpectedVFileTransfers.insert(filename); +} + +bool LLXferManager::validateVFileForTransfer(const std::string& filename) +{ + return find_and_remove(mExpectedVFileTransfers, filename); +} +*/ + static bool remove_prefix(std::string& filename, const std::string& prefix) { if (std::equal(prefix.begin(), prefix.end(), filename.begin())) @@ -807,7 +766,7 @@ void LLXferManager::processFileRequest (LLMessageSystem *mesgsys, void ** /*user mesgsys->getU64Fast(_PREHASH_XferID, _PREHASH_ID, id); char U64_BUF[MAX_STRING]; /* Flawfinder : ignore */ - LL_INFOS() << "xfer request id: " << U64_to_str(id, U64_BUF, sizeof(U64_BUF)) + LL_INFOS("Xfer") << "xfer request id: " << U64_to_str(id, U64_BUF, sizeof(U64_BUF)) << " to " << mesgsys->getSender() << LL_ENDL; mesgsys->getStringFast(_PREHASH_XferID, _PREHASH_Filename, local_filename); @@ -825,36 +784,45 @@ void LLXferManager::processFileRequest (LLMessageSystem *mesgsys, void ** /*user LLXfer *xferp; if (uuid != LLUUID::null) - { + { // Request for an asset - use a VFS file if(NULL == LLAssetType::lookup(type)) { - LL_WARNS() << "Invalid type for xfer request: " << uuid << ":" + LL_WARNS("Xfer") << "Invalid type for xfer request: " << uuid << ":" << type_s16 << " to " << mesgsys->getSender() << LL_ENDL; return; } - LL_INFOS() << "starting vfile transfer: " << uuid << "," << LLAssetType::lookup(type) << " to " << mesgsys->getSender() << LL_ENDL; - if (! mVFS) { - LL_WARNS() << "Attempt to send VFile w/o available VFS" << LL_ENDL; + LL_WARNS("Xfer") << "Attempt to send VFile w/o available VFS" << LL_ENDL; return; } + /* Present in fireengine, not used by viewer + if (!validateVFileForTransfer(uuid.asString())) + { + // it is up to the app sending the file to mark it for expected + // transfer before the request arrives or it will be dropped + LL_WARNS("Xfer") << "SECURITY: Unapproved VFile '" << uuid << "'" << LL_ENDL; + return; + } + */ + + LL_INFOS("Xfer") << "starting vfile transfer: " << uuid << "," << LLAssetType::lookup(type) << " to " << mesgsys->getSender() << LL_ENDL; + xferp = (LLXfer *)new LLXfer_VFile(mVFS, uuid, type); if (xferp) { - xferp->mNext = mSendList; - mSendList = xferp; + mSendList.push_front(xferp); result = xferp->startSend(id,mesgsys->getSender()); } else { - LL_ERRS() << "Xfer allcoation error" << LL_ENDL; + LL_ERRS("Xfer") << "Xfer allcoation error" << LL_ENDL; } } else if (!local_filename.empty()) - { + { // Was given a file name to send // See DEV-21775 for detailed security issues if (local_path == LL_PATH_NONE) @@ -873,7 +841,7 @@ void LLXferManager::processFileRequest (LLMessageSystem *mesgsys, void ** /*user case LL_PATH_NONE: if(!validateFileForTransfer(local_filename)) { - LL_WARNS() << "SECURITY: Unapproved filename '" << local_filename << LL_ENDL; + LL_WARNS("Xfer") << "SECURITY: Unapproved filename '" << local_filename << LL_ENDL; return; } break; @@ -881,13 +849,13 @@ void LLXferManager::processFileRequest (LLMessageSystem *mesgsys, void ** /*user case LL_PATH_CACHE: if(!verify_cache_filename(local_filename)) { - LL_WARNS() << "SECURITY: Illegal cache filename '" << local_filename << LL_ENDL; + LL_WARNS("Xfer") << "SECURITY: Illegal cache filename '" << local_filename << LL_ENDL; return; } break; default: - LL_WARNS() << "SECURITY: Restricted file dir enum: " << (U32)local_path << LL_ENDL; + LL_WARNS("Xfer") << "SECURITY: Restricted file dir enum: " << (U32)local_path << LL_ENDL; return; } @@ -902,7 +870,7 @@ void LLXferManager::processFileRequest (LLMessageSystem *mesgsys, void ** /*user { expanded_filename = local_filename; } - LL_INFOS() << "starting file transfer: " << expanded_filename << " to " << mesgsys->getSender() << LL_ENDL; + LL_INFOS("Xfer") << "starting file transfer: " << expanded_filename << " to " << mesgsys->getSender() << LL_ENDL; BOOL delete_local_on_completion = FALSE; mesgsys->getBOOL("XferID", "DeleteOnCompletion", delete_local_on_completion); @@ -912,23 +880,22 @@ void LLXferManager::processFileRequest (LLMessageSystem *mesgsys, void ** /*user if (xferp) { - xferp->mNext = mSendList; - mSendList = xferp; + mSendList.push_front(xferp); result = xferp->startSend(id,mesgsys->getSender()); } else { - LL_ERRS() << "Xfer allcoation error" << LL_ENDL; + LL_ERRS("Xfer") << "Xfer allcoation error" << LL_ENDL; } } else - { + { // no uuid or filename - use the ID sent char U64_BUF[MAX_STRING]; /* Flawfinder : ignore */ - LL_INFOS() << "starting memory transfer: " + LL_INFOS("Xfer") << "starting memory transfer: " << U64_to_str(id, U64_BUF, sizeof(U64_BUF)) << " to " << mesgsys->getSender() << LL_ENDL; - xferp = findXfer(id, mSendList); + xferp = findXferByID(id, mSendList); if (xferp) { @@ -936,7 +903,7 @@ void LLXferManager::processFileRequest (LLMessageSystem *mesgsys, void ** /*user } else { - LL_INFOS() << "Warning: " << U64_BUF << " not found." << LL_ENDL; + LL_INFOS("Xfer") << "Warning: xfer ID " << U64_BUF << " not found." << LL_ENDL; result = LL_ERR_FILE_NOT_FOUND; } } @@ -946,11 +913,11 @@ void LLXferManager::processFileRequest (LLMessageSystem *mesgsys, void ** /*user if (xferp) { xferp->abort(result); - removeXfer(xferp,&mSendList); + removeXfer(xferp, mSendList); } else // can happen with a memory transfer not found { - LL_INFOS() << "Aborting xfer to " << mesgsys->getSender() << " with error: " << result << LL_ENDL; + LL_INFOS("Xfer") << "Aborting xfer to " << mesgsys->getSender() << " with error: " << result << LL_ENDL; mesgsys->newMessageFast(_PREHASH_AbortXfer); mesgsys->nextBlockFast(_PREHASH_XferID); @@ -960,24 +927,86 @@ void LLXferManager::processFileRequest (LLMessageSystem *mesgsys, void ** /*user mesgsys->sendMessage(mesgsys->getSender()); } } - else if(xferp && (numActiveXfers(xferp->mRemoteHost) < mMaxOutgoingXfersPerCircuit)) + else if(xferp) { - xferp->sendNextPacket(); - changeNumActiveXfers(xferp->mRemoteHost,1); -// LL_INFOS() << "***STARTING XFER IMMEDIATELY***" << LL_ENDL; - } - else - { - if(xferp) + // Figure out how many transfers the host has requested + LLHostStatus *host_statusp = findHostStatus(xferp->mRemoteHost); + if (host_statusp) { - LL_INFOS() << " queueing xfer request, " << numPendingXfers(xferp->mRemoteHost) << " ahead of this one" << LL_ENDL; + if (host_statusp->mNumActive < mMaxOutgoingXfersPerCircuit) + { // Not many transfers in progress already, so start immediately + xferp->sendNextPacket(); + changeNumActiveXfers(xferp->mRemoteHost,1); + LL_DEBUGS("Xfer") << "Starting xfer ID " << U64_to_str(id) << " immediately" << LL_ENDL; + } + else if (mHardLimitOutgoingXfersPerCircuit == 0 || + (host_statusp->mNumActive + host_statusp->mNumPending) < mHardLimitOutgoingXfersPerCircuit) + { // Must close the file handle and wait for earlier ones to complete + LL_INFOS("Xfer") << " queueing xfer request id " << U64_to_str(id) << ", " + << host_statusp->mNumActive << " active and " + << host_statusp->mNumPending << " pending ahead of this one" + << LL_ENDL; + xferp->closeFileHandle(); // Close the file handle until we're ready to send again + } + else if (mHardLimitOutgoingXfersPerCircuit > 0) + { // Way too many requested ... it's time to stop being nice and kill the circuit + xferp->closeFileHandle(); // Close the file handle in any case + LLCircuitData *cdp = gMessageSystem->mCircuitInfo.findCircuit(xferp->mRemoteHost); + if (cdp) + { + if (cdp->getTrusted()) + { // Trusted internal circuit - don't kill it + LL_WARNS("Xfer") << "Trusted circuit to " << xferp->mRemoteHost << " has too many xfer requests in the queue " + << host_statusp->mNumActive << " active and " + << host_statusp->mNumPending << " pending ahead of this one" + << LL_ENDL; + } + else + { // Untrusted circuit - time to stop messing around and kill it + LL_WARNS("Xfer") << "Killing circuit to " << xferp->mRemoteHost << " for having too many xfer requests in the queue " + << host_statusp->mNumActive << " active and " + << host_statusp->mNumPending << " pending ahead of this one" + << LL_ENDL; + gMessageSystem->disableCircuit(xferp->mRemoteHost); + } + } + else + { // WTF? Why can't we find a circuit? Try to kill it off + LL_WARNS("Xfer") << "Backlog with circuit to " << xferp->mRemoteHost << " with too many xfer requests in the queue " + << host_statusp->mNumActive << " active and " + << host_statusp->mNumPending << " pending ahead of this one" + << " but no LLCircuitData found???" + << LL_ENDL; + gMessageSystem->disableCircuit(xferp->mRemoteHost); + } + } } else { - LL_WARNS() << "LLXferManager::processFileRequest() - no xfer found!" - << LL_ENDL; + LL_WARNS("Xfer") << "LLXferManager::processFileRequest() - no LLHostStatus found for id " << U64_to_str(id) + << " host " << xferp->mRemoteHost << LL_ENDL; } } + else + { + LL_WARNS("Xfer") << "LLXferManager::processFileRequest() - no xfer found for id " << U64_to_str(id) << LL_ENDL; + } +} + +/////////////////////////////////////////////////////////// + +// Return true if host is in a transfer-flood sitation. Same check for both internal and external hosts +bool LLXferManager::isHostFlooded(const LLHost & host) +{ + bool flooded = false; + LLHostStatus *host_statusp = findHostStatus(host); + if (host_statusp) + { + flooded = (mHardLimitOutgoingXfersPerCircuit > 0 && + (host_statusp->mNumActive + host_statusp->mNumPending) >= (S32)(mHardLimitOutgoingXfersPerCircuit * 0.8f)); + } + + return flooded; } @@ -991,7 +1020,7 @@ void LLXferManager::processConfirmation (LLMessageSystem *mesgsys, void ** /*use mesgsys->getU64Fast(_PREHASH_XferID, _PREHASH_ID, id); mesgsys->getS32Fast(_PREHASH_XferID, _PREHASH_Packet, packetNum); - LLXfer* xferp = findXfer(id, mSendList); + LLXfer* xferp = findXferByID(id, mSendList); if (xferp) { // cout << "confirmed packet #" << packetNum << " ping: "<< xferp->ACKTimer.getElapsedTimeF32() << endl; @@ -1002,91 +1031,105 @@ void LLXferManager::processConfirmation (LLMessageSystem *mesgsys, void ** /*use } else { - removeXfer(xferp, &mSendList); + removeXfer(xferp, mSendList); } } } /////////////////////////////////////////////////////////// -void LLXferManager::retransmitUnackedPackets () +// Called from LLMessageSystem::processAcks() +void LLXferManager::retransmitUnackedPackets() { LLXfer *xferp; - LLXfer *delp; - xferp = mReceiveList; - while(xferp) + + xfer_list_t::iterator iter = mReceiveList.begin(); + while (iter != mReceiveList.end()) { + xferp = (*iter); if (xferp->mStatus == e_LL_XFER_IN_PROGRESS) { // if the circuit dies, abort if (! gMessageSystem->mCircuitInfo.isCircuitAlive( xferp->mRemoteHost )) { - LL_INFOS() << "Xfer found in progress on dead circuit, aborting" << LL_ENDL; + LL_WARNS("Xfer") << "Xfer found in progress on dead circuit, aborting transfer to " + << xferp->mRemoteHost.getIPandPort() + << LL_ENDL; xferp->mCallbackResult = LL_ERR_CIRCUIT_GONE; xferp->processEOF(); - delp = xferp; - xferp = xferp->mNext; - removeXfer(delp,&mReceiveList); + + iter = mReceiveList.erase(iter); // iter is set to next one after the deletion point + delete (xferp); continue; } } - xferp = xferp->mNext; + ++iter; } - xferp = mSendList; + // Re-build mOutgoingHosts data updateHostStatus(); + F32 et; - while (xferp) + iter = mSendList.begin(); + while (iter != mSendList.end()) { + xferp = (*iter); if (xferp->mWaitingForACK && ( (et = xferp->ACKTimer.getElapsedTimeF32()) > LL_PACKET_TIMEOUT)) { if (xferp->mRetries > LL_PACKET_RETRY_LIMIT) { - LL_INFOS() << "dropping xfer " << xferp->mRemoteHost << ":" << xferp->getFileName() << " packet retransmit limit exceeded, xfer dropped" << LL_ENDL; + LL_INFOS("Xfer") << "dropping xfer " << xferp->mRemoteHost << ":" << xferp->getFileName() << " packet retransmit limit exceeded, xfer dropped" << LL_ENDL; xferp->abort(LL_ERR_TCP_TIMEOUT); - delp = xferp; - xferp = xferp->mNext; - removeXfer(delp,&mSendList); + iter = mSendList.erase(iter); + delete xferp; + continue; } else { - LL_INFOS() << "resending xfer " << xferp->mRemoteHost << ":" << xferp->getFileName() << " packet unconfirmed after: "<< et << " sec, packet " << xferp->mPacketNum << LL_ENDL; + LL_INFOS("Xfer") << "resending xfer " << xferp->mRemoteHost << ":" << xferp->getFileName() << " packet unconfirmed after: "<< et << " sec, packet " << xferp->mPacketNum << LL_ENDL; xferp->resendLastPacket(); - xferp = xferp->mNext; } } else if ((xferp->mStatus == e_LL_XFER_REGISTERED) && ( (et = xferp->ACKTimer.getElapsedTimeF32()) > LL_XFER_REGISTRATION_TIMEOUT)) { - LL_INFOS() << "registered xfer never requested, xfer dropped" << LL_ENDL; + LL_INFOS("Xfer") << "registered xfer never requested, xfer dropped" << LL_ENDL; xferp->abort(LL_ERR_TCP_TIMEOUT); - delp = xferp; - xferp = xferp->mNext; - removeXfer(delp,&mSendList); + iter = mSendList.erase(iter); + delete xferp; + continue; } else if (xferp->mStatus == e_LL_XFER_ABORTED) { - LL_WARNS() << "Removing aborted xfer " << xferp->mRemoteHost << ":" << xferp->getFileName() << LL_ENDL; - delp = xferp; - xferp = xferp->mNext; - removeXfer(delp,&mSendList); + LL_WARNS("Xfer") << "Removing aborted xfer " << xferp->mRemoteHost << ":" << xferp->getFileName() << LL_ENDL; + iter = mSendList.erase(iter); + delete xferp; + continue; } else if (xferp->mStatus == e_LL_XFER_PENDING) { -// LL_INFOS() << "*** numActiveXfers = " << numActiveXfers(xferp->mRemoteHost) << " mMaxOutgoingXfersPerCircuit = " << mMaxOutgoingXfersPerCircuit << LL_ENDL; +// LL_INFOS("Xfer") << "*** numActiveXfers = " << numActiveXfers(xferp->mRemoteHost) << " mMaxOutgoingXfersPerCircuit = " << mMaxOutgoingXfersPerCircuit << LL_ENDL; if (numActiveXfers(xferp->mRemoteHost) < mMaxOutgoingXfersPerCircuit) { -// LL_INFOS() << "bumping pending xfer to active" << LL_ENDL; - xferp->sendNextPacket(); - changeNumActiveXfers(xferp->mRemoteHost,1); - } - xferp = xferp->mNext; - } - else - { - xferp = xferp->mNext; + if (xferp->reopenFileHandle()) + { + LL_WARNS("Xfer") << "Error re-opening file handle for xfer ID " << U64_to_str(xferp->mID) + << " to host " << xferp->mRemoteHost << LL_ENDL; + xferp->abort(LL_ERR_CANNOT_OPEN_FILE); + iter = mSendList.erase(iter); + delete xferp; + continue; + } + else + { // No error re-opening the file, send the first packet + LL_DEBUGS("Xfer") << "Moving pending xfer ID " << U64_to_str(xferp->mID) << " to active" << LL_ENDL; + xferp->sendNextPacket(); + changeNumActiveXfers(xferp->mRemoteHost,1); + } + } } - } + ++iter; + } // end while() loop // // HACK - if we're using xfer confirm throttling, throttle our xfer confirms here @@ -1099,10 +1142,10 @@ void LLXferManager::retransmitUnackedPackets () { break; } - //LL_INFOS() << "Confirm packet queue length:" << mXferAckQueue.size() << LL_ENDL; + //LL_INFOS("Xfer") << "Confirm packet queue length:" << mXferAckQueue.size() << LL_ENDL; LLXferAckInfo ack_info = mXferAckQueue.front(); mXferAckQueue.pop_front(); - //LL_INFOS() << "Sending confirm packet" << LL_ENDL; + //LL_INFOS("Xfer") << "Sending confirm packet" << LL_ENDL; sendConfirmPacket(gMessageSystem, ack_info.mID, ack_info.mPacketNum, ack_info.mRemoteHost); mAckThrottle.throttleOverflow(1000.f*8.f); // Assume 1000 bytes/packet } @@ -1112,7 +1155,7 @@ void LLXferManager::retransmitUnackedPackets () void LLXferManager::abortRequestById(U64 xfer_id, S32 result_code) { - LLXfer * xferp = findXfer(xfer_id, mReceiveList); + LLXfer * xferp = findXferByID(xfer_id, mReceiveList); if (xferp) { if (xferp->mStatus == e_LL_XFER_IN_PROGRESS) @@ -1124,7 +1167,7 @@ void LLXferManager::abortRequestById(U64 xfer_id, S32 result_code) { xferp->mCallbackResult = result_code; xferp->processEOF(); //should notify requester - removeXfer(xferp, &mReceiveList); + removeXfer(xferp, mReceiveList); } // Since already removed or marked as aborted no need // to wait for processAbort() to start new download @@ -1143,12 +1186,12 @@ void LLXferManager::processAbort (LLMessageSystem *mesgsys, void ** /*user_data* mesgsys->getU64Fast(_PREHASH_XferID, _PREHASH_ID, id); mesgsys->getS32Fast(_PREHASH_XferID, _PREHASH_Result, result_code); - xferp = findXfer(id, mReceiveList); + xferp = findXferByID(id, mReceiveList); if (xferp) { xferp->mCallbackResult = result_code; xferp->processEOF(); - removeXfer(xferp, &mReceiveList); + removeXfer(xferp, mReceiveList); startPendingDownloads(); } } @@ -1164,27 +1207,29 @@ void LLXferManager::startPendingDownloads() // requests get pushed toward the back. Thus, if we didn't do a // stateful iteration, it would be possible for old requests to // never start. - LLXfer* xferp = mReceiveList; + LLXfer* xferp; std::list<LLXfer*> pending_downloads; S32 download_count = 0; S32 pending_count = 0; - while(xferp) + for (xfer_list_t::iterator iter = mReceiveList.begin(); + iter != mReceiveList.end(); + ++iter) { + xferp = (*iter); if(xferp->mStatus == e_LL_XFER_PENDING) - { + { // Count and accumulate pending downloads ++pending_count; pending_downloads.push_front(xferp); } else if(xferp->mStatus == e_LL_XFER_IN_PROGRESS) - { + { // Count downloads in progress ++download_count; } - xferp = xferp->mNext; } S32 start_count = mMaxIncomingXfers - download_count; - LL_DEBUGS() << "LLXferManager::startPendingDownloads() - XFER_IN_PROGRESS: " + LL_DEBUGS("Xfer") << "LLXferManager::startPendingDownloads() - XFER_IN_PROGRESS: " << download_count << " XFER_PENDING: " << pending_count << " startring " << llmin(start_count, pending_count) << LL_ENDL; @@ -1209,29 +1254,15 @@ void LLXferManager::startPendingDownloads() /////////////////////////////////////////////////////////// -void LLXferManager::addToList(LLXfer* xferp, LLXfer*& head, BOOL is_priority) +void LLXferManager::addToList(LLXfer* xferp, xfer_list_t & xfer_list, BOOL is_priority) { if(is_priority) { - xferp->mNext = NULL; - LLXfer* next = head; - if(next) - { - while(next->mNext) - { - next = next->mNext; - } - next->mNext = xferp; - } - else - { - head = xferp; - } + xfer_list.push_back(xferp); } else { - xferp->mNext = head; - head = xferp; + xfer_list.push_front(xferp); } } diff --git a/indra/llmessage/llxfermanager.h b/indra/llmessage/llxfermanager.h index d258f0a5ce..45ae2ffdd3 100644 --- a/indra/llmessage/llxfermanager.h +++ b/indra/llmessage/llxfermanager.h @@ -77,6 +77,7 @@ class LLXferManager protected: S32 mMaxOutgoingXfersPerCircuit; + S32 mHardLimitOutgoingXfersPerCircuit; // At this limit, kill off the connection S32 mMaxIncomingXfers; BOOL mUseAckThrottling; // Use ack throttling to cap file xfer bandwidth @@ -92,19 +93,22 @@ class LLXferManager HIGH_PRIORITY = TRUE, }; - LLXfer *mSendList; - LLXfer *mReceiveList; + // Linked FIFO list, add to the front and pull from back + typedef std::deque<LLXfer *> xfer_list_t; + xfer_list_t mSendList; + xfer_list_t mReceiveList; typedef std::list<LLHostStatus*> status_list_t; status_list_t mOutgoingHosts; - private: protected: // implementation methods virtual void startPendingDownloads(); - virtual void addToList(LLXfer* xferp, LLXfer*& head, BOOL is_priority); + virtual void addToList(LLXfer* xferp, xfer_list_t & xfer_list, BOOL is_priority); std::multiset<std::string> mExpectedTransfers; // files that are authorized to transfer out std::multiset<std::string> mExpectedRequests; // files that are authorized to be downloaded on top of + std::multiset<std::string> mExpectedVFileTransfers; // files that are authorized to transfer out + std::multiset<std::string> mExpectedVFileRequests; // files that are authorized to be downloaded on top of public: LLXferManager(LLVFS *vfs); @@ -117,14 +121,17 @@ class LLXferManager void setAckThrottleBPS(const F32 bps); // list management routines - virtual LLXfer *findXfer(U64 id, LLXfer *list_head); - virtual void removeXfer (LLXfer *delp, LLXfer **list_head); - virtual U32 numActiveListEntries(LLXfer *list_head); + virtual LLXfer *findXferByID(U64 id, xfer_list_t & xfer_list); + virtual void removeXfer (LLXfer *delp, xfer_list_t & xfer_list); + + LLHostStatus * findHostStatus(const LLHost &host); virtual S32 numActiveXfers(const LLHost &host); virtual S32 numPendingXfers(const LLHost &host); + virtual void changeNumActiveXfers(const LLHost &host, S32 delta); virtual void setMaxOutgoingXfersPerCircuit (S32 max_num); + virtual void setHardLimitOutgoingXfersPerCircuit(S32 max_num); virtual void setMaxIncomingXfers(S32 max_num); virtual void updateHostStatus(); virtual void printHostStatus(); @@ -136,8 +143,6 @@ class LLXferManager virtual S32 decodePacketNum(S32 packet_num); virtual BOOL isLastPacket(S32 packet_num); - virtual U64 registerXfer(const void *datap, const S32 length); - // file requesting routines // .. to file virtual U64 requestFile(const std::string& local_filename, @@ -148,7 +153,7 @@ class LLXferManager void (*callback)(void**,S32,LLExtStat), void** user_data, BOOL is_priority = FALSE, BOOL use_big_packets = FALSE); - + /* // .. to memory virtual void requestFile(const std::string& remote_filename, ELLPath remote_path, @@ -157,7 +162,7 @@ class LLXferManager void (*callback)(void*, S32, void**, S32, LLExtStat), void** user_data, BOOL is_priority = FALSE); - + */ // vfile requesting // .. to vfile virtual void requestVFile(const LLUUID &local_id, const LLUUID& remote_id, @@ -180,18 +185,15 @@ class LLXferManager virtual void expectFileForRequest(const std::string& filename); virtual bool validateFileForRequest(const std::string& filename); -/* -// xfer request (may be memory or file) -// .. to file - virtual void requestXfer(const char *local_filename, U64 xfer_id, - BOOL delete_remote_on_completion, - const LLHost &remote_host, void (*callback)(void **,S32),void **user_data); -// .. to memory - virtual void requestXfer(U64 xfer_id, - const LLHost &remote_host, - BOOL delete_remote_on_completion, - void (*callback)(void *, S32, void **, S32),void **user_data); -*/ + /** + Same idea but for VFiles, kept separate to avoid namespace overlap + */ + /* Present in fireengine, not used by viewer + virtual void expectVFileForTransfer(const std::string& filename); + virtual bool validateVFileForTransfer(const std::string& filename); + virtual void expectVFileForRequest(const std::string& filename); + virtual bool validateVFileForRequest(const std::string& filename); + */ virtual void processReceiveData (LLMessageSystem *mesgsys, void **user_data); virtual void sendConfirmPacket (LLMessageSystem *mesgsys, U64 id, S32 packetnum, const LLHost &remote_host); @@ -204,6 +206,8 @@ class LLXferManager // error handling void abortRequestById(U64 xfer_id, S32 result_code); virtual void processAbort (LLMessageSystem *mesgsys, void **user_data); + + virtual bool isHostFlooded(const LLHost & host); }; extern LLXferManager* gXferManager; |