diff options
author | andreykproductengine <andreykproductengine@lindenlab.com> | 2018-04-19 18:37:35 +0300 |
---|---|---|
committer | andreykproductengine <andreykproductengine@lindenlab.com> | 2018-04-19 18:37:35 +0300 |
commit | 4d3642b2fab163668b1d67c0a6b631417e71e02d (patch) | |
tree | 0d8e821e90662098ed69138451c4d36a1128542f | |
parent | 1fc32d4c973275cf86d2b936e7bb3241e0155197 (diff) |
MAINT-7626 Incorporate transfer changes into viewer
-rw-r--r-- | indra/llmessage/llxfer.cpp | 43 | ||||
-rw-r--r-- | indra/llmessage/llxfer.h | 6 | ||||
-rw-r--r-- | indra/llmessage/llxfer_file.cpp | 40 | ||||
-rw-r--r-- | indra/llmessage/llxfer_file.h | 6 | ||||
-rw-r--r-- | indra/llmessage/llxfer_mem.cpp | 22 | ||||
-rw-r--r-- | indra/llmessage/llxfer_mem.h | 1 | ||||
-rw-r--r-- | indra/llmessage/llxfer_vfile.cpp | 32 | ||||
-rw-r--r-- | indra/llmessage/llxfer_vfile.h | 6 | ||||
-rw-r--r-- | indra/llmessage/llxfermanager.cpp | 425 | ||||
-rw-r--r-- | indra/llmessage/llxfermanager.h | 21 |
10 files changed, 364 insertions, 238 deletions
diff --git a/indra/llmessage/llxfer.cpp b/indra/llmessage/llxfer.cpp index e0590dfdff..a42ecefd23 100644 --- a/indra/llmessage/llxfer.cpp +++ b/indra/llmessage/llxfer.cpp @@ -99,7 +99,22 @@ void LLXfer::cleanup () S32 LLXfer::startSend (U64 xfer_id, const LLHost &remote_host) { - LL_WARNS() << "undifferentiated LLXfer::startSend for " << getFileName() << LL_ENDL; + LL_WARNS("Xfer") << "unexpected call to base class LLXfer::startSend for " << getFileName() << LL_ENDL; + return (-1); +} + +/////////////////////////////////////////////////////////// + +void LLXfer::closeFileHandle() +{ + LL_WARNS("Xfer") << "unexpected call to base class LLXfer::closeFileHandle for " << getFileName() << LL_ENDL; +} + +/////////////////////////////////////////////////////////// + +S32 LLXfer::reopenFileHandle() +{ + LL_WARNS("Xfer") << "unexpected call to base class LLXfer::reopenFileHandle for " << getFileName() << LL_ENDL; return (-1); } @@ -127,14 +142,14 @@ S32 LLXfer::receiveData (char *datap, S32 data_size) S32 retval = 0; if (((S32) mBufferLength + data_size) > getMaxBufferSize()) - { + { // Write existing data to disk if it's larger than the buffer size retval = flush(); } if (!retval) { if (datap != NULL) - { + { // Append new data to mBuffer memcpy(&mBuffer[mBufferLength],datap,data_size); /*Flawfinder: ignore*/ mBufferLength += data_size; } @@ -248,7 +263,12 @@ void LLXfer::sendPacket(S32 packet_num) gMessageSystem->nextBlockFast(_PREHASH_DataPacket); gMessageSystem->addBinaryDataFast(_PREHASH_Data, &fdata_buf,fdata_size); - gMessageSystem->sendMessage(mRemoteHost); + S32 sent_something = gMessageSystem->sendMessage(mRemoteHost); + if (sent_something == 0) + { + abort(LL_ERR_CIRCUIT_GONE); + return; + } ACKTimer.reset(); mWaitingForACK = TRUE; @@ -326,12 +346,15 @@ void LLXfer::abort (S32 result_code) LL_INFOS() << "Aborting xfer from " << mRemoteHost << " named " << getFileName() << " - error: " << result_code << LL_ENDL; - gMessageSystem->newMessageFast(_PREHASH_AbortXfer); - gMessageSystem->nextBlockFast(_PREHASH_XferID); - gMessageSystem->addU64Fast(_PREHASH_ID, mID); - gMessageSystem->addS32Fast(_PREHASH_Result, result_code); - - gMessageSystem->sendMessage(mRemoteHost); + if (result_code != LL_ERR_CIRCUIT_GONE) + { + gMessageSystem->newMessageFast(_PREHASH_AbortXfer); + gMessageSystem->nextBlockFast(_PREHASH_XferID); + gMessageSystem->addU64Fast(_PREHASH_ID, mID); + gMessageSystem->addS32Fast(_PREHASH_Result, result_code); + + gMessageSystem->sendMessage(mRemoteHost); + } mStatus = e_LL_XFER_ABORTED; } diff --git a/indra/llmessage/llxfer.h b/indra/llmessage/llxfer.h index edf5eeb82d..3be8da4fbe 100644 --- a/indra/llmessage/llxfer.h +++ b/indra/llmessage/llxfer.h @@ -62,7 +62,7 @@ class LLXfer S32 mXferSize; char *mBuffer; - U32 mBufferLength; + U32 mBufferLength; // Size of valid data, not actual allocated buffer size U32 mBufferStartOffset; BOOL mBufferContainsEOF; @@ -90,7 +90,9 @@ class LLXfer void init(S32 chunk_size); virtual void cleanup(); - virtual S32 startSend (U64 xfer_id, const LLHost &remote_host); + virtual S32 startSend(U64 xfer_id, const LLHost &remote_host); + virtual void closeFileHandle(); + virtual S32 reopenFileHandle(); virtual void sendPacket(S32 packet_num); virtual void sendNextPacket(); virtual void resendLastPacket(); diff --git a/indra/llmessage/llxfer_file.cpp b/indra/llmessage/llxfer_file.cpp index 8e2ed890e7..ad55e389f3 100644 --- a/indra/llmessage/llxfer_file.cpp +++ b/indra/llmessage/llxfer_file.cpp @@ -206,7 +206,8 @@ S32 LLXfer_File::startSend (U64 xfer_id, const LLHost &remote_host) mBufferLength = 0; mBufferStartOffset = 0; - + + // We leave the file open, assuming we'll start reading and sending soon mFp = LLFile::fopen(mLocalFilename,"rb"); /* Flawfinder : ignore */ if (mFp) { @@ -233,6 +234,36 @@ S32 LLXfer_File::startSend (U64 xfer_id, const LLHost &remote_host) } /////////////////////////////////////////////////////////// +void LLXfer_File::closeFileHandle() +{ + if (mFp) + { + fclose(mFp); + mFp = NULL; + } +} + +/////////////////////////////////////////////////////////// + +S32 LLXfer_File::reopenFileHandle() +{ + S32 retval = LL_ERR_NOERR; // presume success + + if (mFp == NULL) + { + mFp = LLFile::fopen(mLocalFilename,"rb"); /* Flawfinder : ignore */ + if (mFp == NULL) + { + LL_INFOS("Xfer") << "Warning: " << mLocalFilename << " not found when re-opening file" << LL_ENDL; + retval = LL_ERR_FILE_NOT_FOUND; + } + } + + return retval; +} + + +/////////////////////////////////////////////////////////// S32 LLXfer_File::getMaxBufferSize () { @@ -285,9 +316,12 @@ S32 LLXfer_File::flush() if (mFp) { - if (fwrite(mBuffer,1,mBufferLength,mFp) != mBufferLength) + S32 write_size = fwrite(mBuffer,1,mBufferLength,mFp); + if (write_size != mBufferLength) { - LL_WARNS() << "Short write" << LL_ENDL; + LL_WARNS("Xfer") << "Non-matching write size, requested " << mBufferLength + << " but wrote " << write_size + << LL_ENDL; } // LL_INFOS() << "******* wrote " << mBufferLength << " bytes of file xfer" << LL_ENDL; diff --git a/indra/llmessage/llxfer_file.h b/indra/llmessage/llxfer_file.h index a37dda6732..ab9374549e 100644 --- a/indra/llmessage/llxfer_file.h +++ b/indra/llmessage/llxfer_file.h @@ -61,8 +61,10 @@ class LLXfer_File : public LLXfer virtual S32 startDownload(); virtual S32 processEOF(); - - virtual S32 startSend (U64 xfer_id, const LLHost &remote_host); + + virtual S32 startSend(U64 xfer_id, const LLHost &remote_host); + virtual void closeFileHandle(); + virtual S32 reopenFileHandle(); virtual S32 suck(S32 start_position); virtual S32 flush(); diff --git a/indra/llmessage/llxfer_mem.cpp b/indra/llmessage/llxfer_mem.cpp index 3bea08f2e5..78a3e4f558 100644 --- a/indra/llmessage/llxfer_mem.cpp +++ b/indra/llmessage/llxfer_mem.cpp @@ -80,28 +80,6 @@ void LLXfer_Mem::setXferSize (S32 xfer_size) /////////////////////////////////////////////////////////// -U64 LLXfer_Mem::registerXfer(U64 xfer_id, const void *datap, const S32 length) -{ - mID = xfer_id; - - if (datap) - { - setXferSize(length); - if (mBuffer) - { - memcpy(mBuffer,datap,length); /* Flawfinder : ignore */ - mBufferLength = length; - } - else - { - xfer_id = 0; - } - } - - mStatus = e_LL_XFER_REGISTERED; - return (xfer_id); -} - S32 LLXfer_Mem::startSend (U64 xfer_id, const LLHost &remote_host) { S32 retval = LL_ERR_NOERR; // presume success diff --git a/indra/llmessage/llxfer_mem.h b/indra/llmessage/llxfer_mem.h index b5adf837df..d07779de87 100644 --- a/indra/llmessage/llxfer_mem.h +++ b/indra/llmessage/llxfer_mem.h @@ -53,7 +53,6 @@ class LLXfer_Mem : public LLXfer virtual void cleanup(); virtual S32 startSend (U64 xfer_id, const LLHost &remote_host); - virtual U64 registerXfer(U64 xfer_id, const void *datap, const S32 length); virtual void setXferSize (S32 data_size); virtual S32 initializeRequest(U64 xfer_id, diff --git a/indra/llmessage/llxfer_vfile.cpp b/indra/llmessage/llxfer_vfile.cpp index 4a378d1d34..5d41180192 100644 --- a/indra/llmessage/llxfer_vfile.cpp +++ b/indra/llmessage/llxfer_vfile.cpp @@ -205,6 +205,38 @@ S32 LLXfer_VFile::startSend (U64 xfer_id, const LLHost &remote_host) } /////////////////////////////////////////////////////////// + +void LLXfer_VFile::closeFileHandle() +{ + if (mVFile) + { + delete mVFile; + mVFile = NULL; + } +} + +/////////////////////////////////////////////////////////// + +S32 LLXfer_VFile::reopenFileHandle() +{ + S32 retval = LL_ERR_NOERR; // presume success + + if (mVFile == NULL) + { + mVFile =new LLVFile(mVFS, mLocalID, mType, LLVFile::READ); + if (mVFile == NULL) + { + LL_WARNS("Xfer") << "LLXfer_VFile::reopenFileHandle() can't read VFS file " << mLocalID << "." << LLAssetType::lookup(mType) << LL_ENDL; + retval = LL_ERR_FILE_NOT_FOUND; + } + } + + return retval; +} + + +/////////////////////////////////////////////////////////// + void LLXfer_VFile::setXferSize (S32 xfer_size) { LLXfer::setXferSize(xfer_size); diff --git a/indra/llmessage/llxfer_vfile.h b/indra/llmessage/llxfer_vfile.h index 048bf49dcc..b606c08122 100644 --- a/indra/llmessage/llxfer_vfile.h +++ b/indra/llmessage/llxfer_vfile.h @@ -66,8 +66,10 @@ class LLXfer_VFile : public LLXfer virtual S32 startDownload(); virtual S32 processEOF(); - - virtual S32 startSend (U64 xfer_id, const LLHost &remote_host); + + virtual S32 startSend(U64 xfer_id, const LLHost &remote_host); + virtual void closeFileHandle(); + virtual S32 reopenFileHandle(); virtual S32 suck(S32 start_position); virtual S32 flush(); diff --git a/indra/llmessage/llxfermanager.cpp b/indra/llmessage/llxfermanager.cpp index 2ceb64ce8f..8f18004791 100644 --- a/indra/llmessage/llxfermanager.cpp +++ b/indra/llmessage/llxfermanager.cpp @@ -44,6 +44,8 @@ const S32 LL_PACKET_RETRY_LIMIT = 10; // packet retransmission limit const S32 LL_DEFAULT_MAX_SIMULTANEOUS_XFERS = 10; const S32 LL_DEFAULT_MAX_REQUEST_FIFO_XFERS = 1000; +const S32 LL_DEFAULT_MAX_HARD_LIMIT_SIMULTANEOUS_XFERS = 500; + #define LL_XFER_PROGRESS_MESSAGES 0 #define LL_XFER_TEST_REXMIT 0 @@ -66,10 +68,10 @@ LLXferManager::~LLXferManager () void LLXferManager::init (LLVFS *vfs) { - mSendList = NULL; - mReceiveList = NULL; + cleanup(); setMaxOutgoingXfersPerCircuit(LL_DEFAULT_MAX_SIMULTANEOUS_XFERS); + setHardLimitOutgoingXfersPerCircuit(LL_DEFAULT_MAX_HARD_LIMIT_SIMULTANEOUS_XFERS); setMaxIncomingXfers(LL_DEFAULT_MAX_REQUEST_FIFO_XFERS); mVFS = vfs; @@ -83,29 +85,14 @@ void LLXferManager::init (LLVFS *vfs) void LLXferManager::cleanup () { - LLXfer *xferp; - LLXfer *delp; - for_each(mOutgoingHosts.begin(), mOutgoingHosts.end(), DeletePointer()); mOutgoingHosts.clear(); - delp = mSendList; - while (delp) - { - xferp = delp->mNext; - delete delp; - delp = xferp; - } - mSendList = NULL; + for_each(mSendList.begin(), mSendList.end(), DeletePointer()); + mSendList.clear(); - delp = mReceiveList; - while (delp) - { - xferp = delp->mNext; - delete delp; - delp = xferp; - } - mReceiveList = NULL; + for_each(mReceiveList.begin(), mReceiveList.end(), DeletePointer()); + mReceiveList.clear(); } /////////////////////////////////////////////////////////// @@ -122,6 +109,11 @@ void LLXferManager::setMaxOutgoingXfersPerCircuit(S32 max_num) mMaxOutgoingXfersPerCircuit = max_num; } +void LLXferManager::setHardLimitOutgoingXfersPerCircuit(S32 max_num) +{ + mHardLimitOutgoingXfersPerCircuit = max_num; +} + void LLXferManager::setUseAckThrottling(const BOOL use) { mUseAckThrottling = use; @@ -148,19 +140,18 @@ void LLXferManager::setAckThrottleBPS(const F32 bps) void LLXferManager::updateHostStatus() { - LLXfer *xferp; - LLHostStatus *host_statusp = NULL; - for_each(mOutgoingHosts.begin(), mOutgoingHosts.end(), DeletePointer()); mOutgoingHosts.clear(); - for (xferp = mSendList; xferp; xferp = xferp->mNext) + for (xfer_list_t::iterator send_iter = mSendList.begin(); + send_iter != mSendList.end(); ++send_iter) { + LLHostStatus *host_statusp = NULL; for (status_list_t::iterator iter = mOutgoingHosts.begin(); iter != mOutgoingHosts.end(); ++iter) { host_statusp = *iter; - if (host_statusp->mHost == xferp->mRemoteHost) + if (host_statusp->mHost == (*send_iter)->mRemoteHost) { break; } @@ -170,23 +161,22 @@ void LLXferManager::updateHostStatus() host_statusp = new LLHostStatus(); if (host_statusp) { - host_statusp->mHost = xferp->mRemoteHost; + host_statusp->mHost = (*send_iter)->mRemoteHost; mOutgoingHosts.push_front(host_statusp); } } if (host_statusp) { - if (xferp->mStatus == e_LL_XFER_PENDING) + if ((*send_iter)->mStatus == e_LL_XFER_PENDING) { host_statusp->mNumPending++; } - else if (xferp->mStatus == e_LL_XFER_IN_PROGRESS) + else if ((*send_iter)->mStatus == e_LL_XFER_IN_PROGRESS) { host_statusp->mNumActive++; } } - - } + } } /////////////////////////////////////////////////////////// @@ -209,14 +199,15 @@ void LLXferManager::printHostStatus() /////////////////////////////////////////////////////////// -LLXfer *LLXferManager::findXfer (U64 id, LLXfer *list_head) +LLXfer *LLXferManager::findXfer(U64 id, xfer_list_t & xfer_list) { - LLXfer *xferp; - for (xferp = list_head; xferp; xferp = xferp->mNext) + for (xfer_list_t::iterator iter = xfer_list.begin(); + iter != xfer_list.end(); + ++iter) { - if (xferp->mID == id) + if ((*iter)->mID == id) { - return(xferp); + return(*iter); } } return(NULL); @@ -225,29 +216,34 @@ LLXfer *LLXferManager::findXfer (U64 id, LLXfer *list_head) /////////////////////////////////////////////////////////// -void LLXferManager::removeXfer (LLXfer *delp, LLXfer **list_head) +void LLXferManager::removeXfer(LLXfer *delp, xfer_list_t & xfer_list) { - // This function assumes that delp will only occur in the list - // zero or one times. if (delp) - { - if (*list_head == delp) + { + std::string direction = "send"; + if (&xfer_list == &mReceiveList) { - *list_head = delp->mNext; - delete (delp); + std::string direction = "receive"; + xfer_list = mSendList; } - else + + // This assumes that delp will occur in the list once at most + // Find the pointer in the list + for (xfer_list_t::iterator iter = xfer_list.begin(); + iter != xfer_list.end(); + ++iter) { - LLXfer *xferp = *list_head; - while (xferp->mNext) + if ((*iter) == delp) { - if (xferp->mNext == delp) - { - xferp->mNext = delp->mNext; - delete (delp); - break; - } - xferp = xferp->mNext; + LL_DEBUGS("Xfer") << "Deleting xfer to host " << (*iter)->mRemoteHost + << " of " << (*iter)->mXferSize << " bytes" + << ", status " << (S32)((*iter)->mStatus) + << " from the " << direction << " list" + << LL_ENDL; + + xfer_list.erase(iter); + delete (delp); + break; } } } @@ -272,7 +268,7 @@ U32 LLXferManager::numActiveListEntries(LLXfer *list_head) /////////////////////////////////////////////////////////// -S32 LLXferManager::numPendingXfers(const LLHost &host) +LLHostStatus * LLXferManager::findHostStatus(const LLHost &host) { LLHostStatus *host_statusp = NULL; @@ -282,26 +278,32 @@ S32 LLXferManager::numPendingXfers(const LLHost &host) host_statusp = *iter; if (host_statusp->mHost == host) { - return (host_statusp->mNumPending); + return (host_statusp); } } return 0; } /////////////////////////////////////////////////////////// + +S32 LLXferManager::numPendingXfers(const LLHost &host) +{ + LLHostStatus *host_statusp = findHostStatus(host); + if (host_statusp) + { + return host_statusp->mNumPending; + } + return 0; +} + +/////////////////////////////////////////////////////////// S32 LLXferManager::numActiveXfers(const LLHost &host) { - LLHostStatus *host_statusp = NULL; - - for (status_list_t::iterator iter = mOutgoingHosts.begin(); - iter != mOutgoingHosts.end(); ++iter) + LLHostStatus *host_statusp = findHostStatus(host); + if (host_statusp) { - host_statusp = *iter; - if (host_statusp->mHost == host) - { - return (host_statusp->mNumActive); - } + return host_statusp->mNumActive; } return 0; } @@ -372,35 +374,6 @@ BOOL LLXferManager::isLastPacket(S32 packet_num) /////////////////////////////////////////////////////////// -U64 LLXferManager::registerXfer(const void *datap, const S32 length) -{ - LLXfer *xferp; - U64 xfer_id = getNextID(); - - xferp = (LLXfer *) new LLXfer_Mem(); - if (xferp) - { - xferp->mNext = mSendList; - mSendList = xferp; - - xfer_id = ((LLXfer_Mem *)xferp)->registerXfer(xfer_id, datap,length); - - if (!xfer_id) - { - removeXfer(xferp,&mSendList); - } - } - else - { - LL_ERRS() << "Xfer allocation error" << LL_ENDL; - xfer_id = 0; - } - - return(xfer_id); -} - -/////////////////////////////////////////////////////////// - U64 LLXferManager::requestFile(const std::string& local_filename, const std::string& remote_filename, ELLPath remote_path, @@ -411,30 +384,33 @@ U64 LLXferManager::requestFile(const std::string& local_filename, BOOL is_priority, BOOL use_big_packets) { - LLXfer *xferp; + LLXfer_File* file_xfer_p = NULL; - for (xferp = mReceiveList; xferp ; xferp = xferp->mNext) + for (xfer_list_t::iterator iter = mReceiveList.begin(); + iter != mReceiveList.end(); ++iter) { - if (xferp->getXferTypeTag() == LLXfer::XFER_FILE - && (((LLXfer_File*)xferp)->matchesLocalFilename(local_filename)) - && (((LLXfer_File*)xferp)->matchesRemoteFilename(remote_filename, remote_path)) - && (remote_host == xferp->mRemoteHost) - && (callback == xferp->mCallback) - && (user_data == xferp->mCallbackDataHandle)) - + if ((*iter)->getXferTypeTag() == LLXfer::XFER_FILE) { - // cout << "requested a xfer already in progress" << endl; - return xferp->mID; + file_xfer_p = (LLXfer_File*)(*iter); + if (file_xfer_p->matchesLocalFilename(local_filename) + && file_xfer_p->matchesRemoteFilename(remote_filename, remote_path) + && (remote_host == file_xfer_p->mRemoteHost) + && (callback == file_xfer_p->mCallback) + && (user_data == file_xfer_p->mCallbackDataHandle)) + { + // Already have the request (already in progress) + return (*iter)->mID; + } } } U64 xfer_id = 0; S32 chunk_size = use_big_packets ? LL_XFER_LARGE_PAYLOAD : -1; - xferp = (LLXfer *) new LLXfer_File(chunk_size); - if (xferp) + file_xfer_p = new LLXfer_File(chunk_size); + if (file_xfer_p) { - addToList(xferp, mReceiveList, is_priority); + addToList(file_xfer_p, mReceiveList, is_priority); // Remove any file by the same name that happens to be lying // around. @@ -447,7 +423,7 @@ U64 LLXferManager::requestFile(const std::string& local_filename, LLFile::remove(local_filename, ENOENT); } xfer_id = getNextID(); - ((LLXfer_File *)xferp)->initializeRequest( + ((LLXfer_File *)file_xfer_p)->initializeRequest( xfer_id, local_filename, remote_filename, @@ -500,28 +476,32 @@ void LLXferManager::requestVFile(const LLUUID& local_id, void** user_data, BOOL is_priority) { - LLXfer *xferp; + LLXfer_VFile * xfer_p = NULL; - for (xferp = mReceiveList; xferp ; xferp = xferp->mNext) + for (xfer_list_t::iterator iter = mReceiveList.begin(); + iter != mReceiveList.end(); ++iter) { - if (xferp->getXferTypeTag() == LLXfer::XFER_VFILE - && (((LLXfer_VFile*)xferp)->matchesLocalFile(local_id, type)) - && (((LLXfer_VFile*)xferp)->matchesRemoteFile(remote_id, type)) - && (remote_host == xferp->mRemoteHost) - && (callback == xferp->mCallback) - && (user_data == xferp->mCallbackDataHandle)) - + if ((*iter)->getXferTypeTag() == LLXfer::XFER_VFILE) { - // cout << "requested a xfer already in progress" << endl; - return; + xfer_p = (LLXfer_VFile*) (*iter); + if (xfer_p->matchesLocalFile(local_id, type) + && xfer_p->matchesRemoteFile(remote_id, type) + && (remote_host == xfer_p->mRemoteHost) + && (callback == xfer_p->mCallback) + && (user_data == xfer_p->mCallbackDataHandle)) + + { + // Have match, already in progress, don't add a duplicate + return; + } } } - xferp = (LLXfer *) new LLXfer_VFile(); - if (xferp) + xfer_p = new LLXfer_VFile(); + if (xfer_p) { - addToList(xferp, mReceiveList, is_priority); - ((LLXfer_VFile *)xferp)->initializeRequest(getNextID(), + addToList(xfer_p, mReceiveList, is_priority); + ((LLXfer_VFile *)xfer_p)->initializeRequest(getNextID(), vfs, local_id, remote_id, @@ -663,7 +643,7 @@ void LLXferManager::processReceiveData (LLMessageSystem *mesgsys, void ** /*user if (result == LL_ERR_CANNOT_OPEN_FILE) { xferp->abort(LL_ERR_CANNOT_OPEN_FILE); - removeXfer(xferp,&mReceiveList); + removeXfer(xferp,mReceiveList); startPendingDownloads(); return; } @@ -688,7 +668,7 @@ void LLXferManager::processReceiveData (LLMessageSystem *mesgsys, void ** /*user if (isLastPacket(packetnum)) { xferp->processEOF(); - removeXfer(xferp,&mReceiveList); + removeXfer(xferp,mReceiveList); startPendingDownloads(); } } @@ -708,6 +688,7 @@ void LLXferManager::sendConfirmPacket (LLMessageSystem *mesgsys, U64 id, S32 pac mesgsys->addU64Fast(_PREHASH_ID, id); mesgsys->addU32Fast(_PREHASH_Packet, packetnum); + // Ignore a circuit failure here, we'll catch it with another message mesgsys->sendMessage(remote_host); } @@ -825,7 +806,7 @@ void LLXferManager::processFileRequest (LLMessageSystem *mesgsys, void ** /*user LLXfer *xferp; if (uuid != LLUUID::null) - { + { // Request for an asset - use a VFS file if(NULL == LLAssetType::lookup(type)) { LL_WARNS() << "Invalid type for xfer request: " << uuid << ":" @@ -844,8 +825,7 @@ void LLXferManager::processFileRequest (LLMessageSystem *mesgsys, void ** /*user xferp = (LLXfer *)new LLXfer_VFile(mVFS, uuid, type); if (xferp) { - xferp->mNext = mSendList; - mSendList = xferp; + mSendList.push_front(xferp); result = xferp->startSend(id,mesgsys->getSender()); } else @@ -854,7 +834,7 @@ void LLXferManager::processFileRequest (LLMessageSystem *mesgsys, void ** /*user } } else if (!local_filename.empty()) - { + { // Was given a file name to send // See DEV-21775 for detailed security issues if (local_path == LL_PATH_NONE) @@ -912,8 +892,7 @@ void LLXferManager::processFileRequest (LLMessageSystem *mesgsys, void ** /*user if (xferp) { - xferp->mNext = mSendList; - mSendList = xferp; + mSendList.push_front(xferp); result = xferp->startSend(id,mesgsys->getSender()); } else @@ -922,7 +901,7 @@ void LLXferManager::processFileRequest (LLMessageSystem *mesgsys, void ** /*user } } else - { + { // no uuid or filename - use the ID sent char U64_BUF[MAX_STRING]; /* Flawfinder : ignore */ LL_INFOS() << "starting memory transfer: " << U64_to_str(id, U64_BUF, sizeof(U64_BUF)) << " to " @@ -946,7 +925,7 @@ void LLXferManager::processFileRequest (LLMessageSystem *mesgsys, void ** /*user if (xferp) { xferp->abort(result); - removeXfer(xferp,&mSendList); + removeXfer(xferp, mSendList); } else // can happen with a memory transfer not found { @@ -960,24 +939,86 @@ void LLXferManager::processFileRequest (LLMessageSystem *mesgsys, void ** /*user mesgsys->sendMessage(mesgsys->getSender()); } } - else if(xferp && (numActiveXfers(xferp->mRemoteHost) < mMaxOutgoingXfersPerCircuit)) - { - xferp->sendNextPacket(); - changeNumActiveXfers(xferp->mRemoteHost,1); -// LL_INFOS() << "***STARTING XFER IMMEDIATELY***" << LL_ENDL; - } - else + else if(xferp) { - if(xferp) + // Figure out how many transfers the host has requested + LLHostStatus *host_statusp = findHostStatus(xferp->mRemoteHost); + if (host_statusp) { - LL_INFOS() << " queueing xfer request, " << numPendingXfers(xferp->mRemoteHost) << " ahead of this one" << LL_ENDL; + if (host_statusp->mNumActive < mMaxOutgoingXfersPerCircuit) + { // Not many transfers in progress already, so start immediately + xferp->sendNextPacket(); + changeNumActiveXfers(xferp->mRemoteHost,1); + LL_DEBUGS("Xfer") << "Starting xfer ID " << U64_to_str(id) << " immediately" << LL_ENDL; + } + else if (mHardLimitOutgoingXfersPerCircuit == 0 || + (host_statusp->mNumActive + host_statusp->mNumPending) < mHardLimitOutgoingXfersPerCircuit) + { // Must close the file handle and wait for earlier ones to complete + LL_INFOS("Xfer") << " queueing xfer request id " << U64_to_str(id) << ", " + << host_statusp->mNumActive << " active and " + << host_statusp->mNumPending << " pending ahead of this one" + << LL_ENDL; + xferp->closeFileHandle(); // Close the file handle until we're ready to send again + } + else if (mHardLimitOutgoingXfersPerCircuit > 0) + { // Way too many requested ... it's time to stop being nice and kill the circuit + xferp->closeFileHandle(); // Close the file handle in any case + LLCircuitData *cdp = gMessageSystem->mCircuitInfo.findCircuit(xferp->mRemoteHost); + if (cdp) + { + if (cdp->getTrusted()) + { // Trusted internal circuit - don't kill it + LL_WARNS("Xfer") << "Trusted circuit to " << xferp->mRemoteHost << " has too many xfer requests in the queue " + << host_statusp->mNumActive << " active and " + << host_statusp->mNumPending << " pending ahead of this one" + << LL_ENDL; + } + else + { // Untrusted circuit - time to stop messing around and kill it + LL_WARNS("Xfer") << "Killing circuit to " << xferp->mRemoteHost << " for having too many xfer requests in the queue " + << host_statusp->mNumActive << " active and " + << host_statusp->mNumPending << " pending ahead of this one" + << LL_ENDL; + gMessageSystem->disableCircuit(xferp->mRemoteHost); + } + } + else + { // WTF? Why can't we find a circuit? Try to kill it off + LL_WARNS("Xfer") << "Backlog with circuit to " << xferp->mRemoteHost << " with too many xfer requests in the queue " + << host_statusp->mNumActive << " active and " + << host_statusp->mNumPending << " pending ahead of this one" + << " but no LLCircuitData found???" + << LL_ENDL; + gMessageSystem->disableCircuit(xferp->mRemoteHost); + } + } } else { - LL_WARNS() << "LLXferManager::processFileRequest() - no xfer found!" - << LL_ENDL; + LL_WARNS("Xfer") << "LLXferManager::processFileRequest() - no LLHostStatus found for id " << U64_to_str(id) + << " host " << xferp->mRemoteHost << LL_ENDL; } } + else + { + LL_WARNS("Xfer") << "LLXferManager::processFileRequest() - no xfer found for id " << U64_to_str(id) << LL_ENDL; + } +} + +/////////////////////////////////////////////////////////// + +// Return true if host is in a transfer-flood sitation. Same check for both internal and external hosts +bool LLXferManager::isHostFlooded(const LLHost & host) +{ + bool flooded = false; + LLHostStatus *host_statusp = findHostStatus(host); + if (host_statusp) + { + flooded = (mHardLimitOutgoingXfersPerCircuit > 0 && + (host_statusp->mNumActive + host_statusp->mNumPending) >= (S32)(mHardLimitOutgoingXfersPerCircuit * 0.8f)); + } + + return flooded; } @@ -1002,52 +1043,57 @@ void LLXferManager::processConfirmation (LLMessageSystem *mesgsys, void ** /*use } else { - removeXfer(xferp, &mSendList); + removeXfer(xferp, mSendList); } } } /////////////////////////////////////////////////////////// -void LLXferManager::retransmitUnackedPackets () +void LLXferManager::retransmitUnackedPackets() { LLXfer *xferp; - LLXfer *delp; - xferp = mReceiveList; - while(xferp) + + xfer_list_t::iterator iter = mReceiveList.begin(); + while (iter != mReceiveList.end()) { + xferp = (*iter); if (xferp->mStatus == e_LL_XFER_IN_PROGRESS) { // if the circuit dies, abort if (! gMessageSystem->mCircuitInfo.isCircuitAlive( xferp->mRemoteHost )) { - LL_INFOS() << "Xfer found in progress on dead circuit, aborting" << LL_ENDL; + LL_WARNS("Xfer") << "Xfer found in progress on dead circuit, aborting transfer to " + << xferp->mRemoteHost.getIPandPort() + << LL_ENDL; xferp->mCallbackResult = LL_ERR_CIRCUIT_GONE; xferp->processEOF(); - delp = xferp; - xferp = xferp->mNext; - removeXfer(delp,&mReceiveList); + + iter = mReceiveList.erase(iter); // iter is set to next one after the deletion point + delete (xferp); continue; } } - xferp = xferp->mNext; + ++iter; } - xferp = mSendList; updateHostStatus(); + F32 et; - while (xferp) + iter = mSendList.begin(); + while (iter != mSendList.end()) { + xferp = (*iter); if (xferp->mWaitingForACK && ( (et = xferp->ACKTimer.getElapsedTimeF32()) > LL_PACKET_TIMEOUT)) { if (xferp->mRetries > LL_PACKET_RETRY_LIMIT) { LL_INFOS() << "dropping xfer " << xferp->mRemoteHost << ":" << xferp->getFileName() << " packet retransmit limit exceeded, xfer dropped" << LL_ENDL; xferp->abort(LL_ERR_TCP_TIMEOUT); - delp = xferp; - xferp = xferp->mNext; - removeXfer(delp,&mSendList); + iter = mSendList.erase(iter); + delete xferp; + continue; } else { @@ -1060,25 +1106,37 @@ void LLXferManager::retransmitUnackedPackets () { LL_INFOS() << "registered xfer never requested, xfer dropped" << LL_ENDL; xferp->abort(LL_ERR_TCP_TIMEOUT); - delp = xferp; - xferp = xferp->mNext; - removeXfer(delp,&mSendList); + iter = mSendList.erase(iter); + delete xferp; + continue; } else if (xferp->mStatus == e_LL_XFER_ABORTED) { LL_WARNS() << "Removing aborted xfer " << xferp->mRemoteHost << ":" << xferp->getFileName() << LL_ENDL; - delp = xferp; - xferp = xferp->mNext; - removeXfer(delp,&mSendList); + iter = mSendList.erase(iter); + delete xferp; + continue; } else if (xferp->mStatus == e_LL_XFER_PENDING) { // LL_INFOS() << "*** numActiveXfers = " << numActiveXfers(xferp->mRemoteHost) << " mMaxOutgoingXfersPerCircuit = " << mMaxOutgoingXfersPerCircuit << LL_ENDL; if (numActiveXfers(xferp->mRemoteHost) < mMaxOutgoingXfersPerCircuit) { -// LL_INFOS() << "bumping pending xfer to active" << LL_ENDL; - xferp->sendNextPacket(); - changeNumActiveXfers(xferp->mRemoteHost,1); + if (xferp->reopenFileHandle()) + { + LL_WARNS("Xfer") << "Error re-opening file handle for xfer ID " << U64_to_str(xferp->mID) + << " to host " << xferp->mRemoteHost << LL_ENDL; + xferp->abort(LL_ERR_CANNOT_OPEN_FILE); + iter = mSendList.erase(iter); + delete xferp; + continue; + } + else + { // No error re-opening the file, send the first packet + LL_DEBUGS("Xfer") << "Moving pending xfer ID " << U64_to_str(xferp->mID) << " to active" << LL_ENDL; + xferp->sendNextPacket(); + changeNumActiveXfers(xferp->mRemoteHost,1); + } } xferp = xferp->mNext; } @@ -1124,7 +1182,7 @@ void LLXferManager::abortRequestById(U64 xfer_id, S32 result_code) { xferp->mCallbackResult = result_code; xferp->processEOF(); //should notify requester - removeXfer(xferp, &mReceiveList); + removeXfer(xferp, mReceiveList); } // Since already removed or marked as aborted no need // to wait for processAbort() to start new download @@ -1148,7 +1206,7 @@ void LLXferManager::processAbort (LLMessageSystem *mesgsys, void ** /*user_data* { xferp->mCallbackResult = result_code; xferp->processEOF(); - removeXfer(xferp, &mReceiveList); + removeXfer(xferp, mReceiveList); startPendingDownloads(); } } @@ -1164,12 +1222,15 @@ void LLXferManager::startPendingDownloads() // requests get pushed toward the back. Thus, if we didn't do a // stateful iteration, it would be possible for old requests to // never start. - LLXfer* xferp = mReceiveList; + LLXfer* xferp; std::list<LLXfer*> pending_downloads; S32 download_count = 0; S32 pending_count = 0; - while(xferp) + for (xfer_list_t::iterator iter = mReceiveList.begin(); + iter != mReceiveList.end(); + ++iter) { + xferp = (*iter); if(xferp->mStatus == e_LL_XFER_PENDING) { ++pending_count; @@ -1209,29 +1270,15 @@ void LLXferManager::startPendingDownloads() /////////////////////////////////////////////////////////// -void LLXferManager::addToList(LLXfer* xferp, LLXfer*& head, BOOL is_priority) +void LLXferManager::addToList(LLXfer* xferp, xfer_list_t & xfer_list, BOOL is_priority) { if(is_priority) { - xferp->mNext = NULL; - LLXfer* next = head; - if(next) - { - while(next->mNext) - { - next = next->mNext; - } - next->mNext = xferp; - } - else - { - head = xferp; - } + xfer_list.push_back(xferp); } else { - xferp->mNext = head; - head = xferp; + xfer_list.push_front(xferp); } } diff --git a/indra/llmessage/llxfermanager.h b/indra/llmessage/llxfermanager.h index d258f0a5ce..ebe8e57ee2 100644 --- a/indra/llmessage/llxfermanager.h +++ b/indra/llmessage/llxfermanager.h @@ -77,6 +77,7 @@ class LLXferManager protected: S32 mMaxOutgoingXfersPerCircuit; + S32 mHardLimitOutgoingXfersPerCircuit; // At this limit, kill off the connection S32 mMaxIncomingXfers; BOOL mUseAckThrottling; // Use ack throttling to cap file xfer bandwidth @@ -92,8 +93,10 @@ class LLXferManager HIGH_PRIORITY = TRUE, }; - LLXfer *mSendList; - LLXfer *mReceiveList; + // Linked FIFO list, add to the front and pull from back + typedef std::deque<LLXfer *> xfer_list_t; + xfer_list_t mSendList; + xfer_list_t mReceiveList; typedef std::list<LLHostStatus*> status_list_t; status_list_t mOutgoingHosts; @@ -102,7 +105,7 @@ class LLXferManager protected: // implementation methods virtual void startPendingDownloads(); - virtual void addToList(LLXfer* xferp, LLXfer*& head, BOOL is_priority); + virtual void addToList(LLXfer* xferp, xfer_list_t & list, BOOL is_priority); std::multiset<std::string> mExpectedTransfers; // files that are authorized to transfer out std::multiset<std::string> mExpectedRequests; // files that are authorized to be downloaded on top of @@ -117,14 +120,18 @@ class LLXferManager void setAckThrottleBPS(const F32 bps); // list management routines - virtual LLXfer *findXfer(U64 id, LLXfer *list_head); - virtual void removeXfer (LLXfer *delp, LLXfer **list_head); + virtual LLXfer *findXfer(U64 id, xfer_list_t & xfer_list); + virtual void removeXfer (LLXfer *delp, xfer_list_t & xfer_list); + + LLHostStatus * findHostStatus(const LLHost &host); virtual U32 numActiveListEntries(LLXfer *list_head); virtual S32 numActiveXfers(const LLHost &host); virtual S32 numPendingXfers(const LLHost &host); + virtual void changeNumActiveXfers(const LLHost &host, S32 delta); virtual void setMaxOutgoingXfersPerCircuit (S32 max_num); + virtual void setHardLimitOutgoingXfersPerCircuit(S32 max_num); virtual void setMaxIncomingXfers(S32 max_num); virtual void updateHostStatus(); virtual void printHostStatus(); @@ -136,8 +143,6 @@ class LLXferManager virtual S32 decodePacketNum(S32 packet_num); virtual BOOL isLastPacket(S32 packet_num); - virtual U64 registerXfer(const void *datap, const S32 length); - // file requesting routines // .. to file virtual U64 requestFile(const std::string& local_filename, @@ -204,6 +209,8 @@ class LLXferManager // error handling void abortRequestById(U64 xfer_id, S32 result_code); virtual void processAbort (LLMessageSystem *mesgsys, void **user_data); + + virtual bool isHostFlooded(const LLHost & host); }; extern LLXferManager* gXferManager; |