diff options
Diffstat (limited to 'indra/llmessage/llxfermanager.cpp')
-rw-r--r-- | indra/llmessage/llxfermanager.cpp | 1133 |
1 files changed, 1133 insertions, 0 deletions
diff --git a/indra/llmessage/llxfermanager.cpp b/indra/llmessage/llxfermanager.cpp new file mode 100644 index 0000000000..e2d8cd30b3 --- /dev/null +++ b/indra/llmessage/llxfermanager.cpp @@ -0,0 +1,1133 @@ +/** + * @file llxfermanager.cpp + * @brief implementation of LLXferManager class for a collection of xfers + * + * Copyright (c) 2001-$CurrentYear$, Linden Research, Inc. + * $License$ + */ + +#include "linden_common.h" + +#include "llxfermanager.h" + +#include "llxfer.h" +#include "llxfer_file.h" +#include "llxfer_mem.h" +#include "llxfer_vfile.h" + +#include "llerror.h" +#include "lluuid.h" +#include "u64.h" + +const F32 LL_XFER_REGISTRATION_TIMEOUT = 60.0f; // timeout if a registered transfer hasn't been requested in 60 seconds +const F32 LL_PACKET_TIMEOUT = 3.0f; // packet timeout at 3 s +const S32 LL_PACKET_RETRY_LIMIT = 10; // packet retransmission limit + +const S32 LL_DEFAULT_MAX_SIMULTANEOUS_XFERS = 10; +const S32 LL_DEFAULT_MAX_REQUEST_FIFO_XFERS = 1000; + +#define LL_XFER_PROGRESS_MESSAGES 0 +#define LL_XFER_TEST_REXMIT 0 + + +/////////////////////////////////////////////////////////// + +LLXferManager::LLXferManager (LLVFS *vfs) +{ + init(vfs); +} + +/////////////////////////////////////////////////////////// + +LLXferManager::~LLXferManager () +{ + free(); +} + +/////////////////////////////////////////////////////////// + +void LLXferManager::init (LLVFS *vfs) +{ + mSendList = NULL; + mReceiveList = NULL; + + setMaxOutgoingXfersPerCircuit(LL_DEFAULT_MAX_SIMULTANEOUS_XFERS); + setMaxIncomingXfers(LL_DEFAULT_MAX_REQUEST_FIFO_XFERS); + + mVFS = vfs; + + // Turn on or off ack throttling + mUseAckThrottling = FALSE; + setAckThrottleBPS(100000); +} + +/////////////////////////////////////////////////////////// + +void LLXferManager::free () +{ + LLXfer *xferp; + LLXfer *delp; + + mOutgoingHosts.deleteAllData(); + + delp = mSendList; + while (delp) + { + xferp = delp->mNext; + delete delp; + delp = xferp; + } + mSendList = NULL; + + delp = mReceiveList; + while (delp) + { + xferp = delp->mNext; + delete delp; + delp = xferp; + } + mReceiveList = NULL; +} + +/////////////////////////////////////////////////////////// + +void LLXferManager::setMaxIncomingXfers(S32 max_num) +{ + mMaxIncomingXfers = max_num; +} + +/////////////////////////////////////////////////////////// + +void LLXferManager::setMaxOutgoingXfersPerCircuit(S32 max_num) +{ + mMaxOutgoingXfersPerCircuit = max_num; +} + +void LLXferManager::setUseAckThrottling(const BOOL use) +{ + mUseAckThrottling = use; +} + +void LLXferManager::setAckThrottleBPS(const F32 bps) +{ + // Let's figure out the min we can set based on the ack retry rate + // and number of simultaneous. + + // Assuming we're running as slow as possible, this is the lowest ack + // rate we can use. + F32 min_bps = (1000.f * 8.f* mMaxIncomingXfers) / LL_PACKET_TIMEOUT; + + // Set + F32 actual_rate = llmax(min_bps*1.1f, bps); + llinfos << "LLXferManager ack throttle min rate: " << min_bps << llendl; + llinfos << "LLXferManager ack throttle actual rate: " << actual_rate << llendl; + mAckThrottle.setRate(actual_rate); +} + + +/////////////////////////////////////////////////////////// + +void LLXferManager::updateHostStatus() +{ + LLXfer *xferp; + LLHostStatus *host_statusp = NULL; + + mOutgoingHosts.deleteAllData(); + + for (xferp = mSendList; xferp; xferp = xferp->mNext) + { + for (host_statusp = mOutgoingHosts.getFirstData(); host_statusp; host_statusp = mOutgoingHosts.getNextData()) + { + if (host_statusp->mHost == xferp->mRemoteHost) + { + break; + } + } + if (!host_statusp) + { + host_statusp = new LLHostStatus(); + if (host_statusp) + { + host_statusp->mHost = xferp->mRemoteHost; + mOutgoingHosts.addData(host_statusp); + } + } + if (host_statusp) + { + if (xferp->mStatus == e_LL_XFER_PENDING) + { + host_statusp->mNumPending++; + } + else if (xferp->mStatus == e_LL_XFER_IN_PROGRESS) + { + host_statusp->mNumActive++; + } + } + + } +} + +/////////////////////////////////////////////////////////// + +void LLXferManager::printHostStatus() +{ + LLHostStatus *host_statusp = NULL; + if (mOutgoingHosts.getFirstData()) + { + llinfos << "Outgoing Xfers:" << llendl; + + for (host_statusp = mOutgoingHosts.getFirstData(); host_statusp; host_statusp = mOutgoingHosts.getNextData()) + { + llinfos << " " << host_statusp->mHost << " active: " << host_statusp->mNumActive << " pending: " << host_statusp->mNumPending << llendl; + } + } +} + +/////////////////////////////////////////////////////////// + +LLXfer *LLXferManager::findXfer (U64 id, LLXfer *list_head) +{ + LLXfer *xferp; + for (xferp = list_head; xferp; xferp = xferp->mNext) + { + if (xferp->mID == id) + { + return(xferp); + } + } + return(NULL); +} + + +/////////////////////////////////////////////////////////// + +void LLXferManager::removeXfer (LLXfer *delp, LLXfer **list_head) +{ + LLXfer *xferp; + + if (delp) + { + if (*list_head == delp) + { + *list_head = delp->mNext; + delete (delp); + } + else + { + xferp = *list_head; + while (xferp->mNext) + { + if (xferp->mNext == delp) + { + xferp->mNext = delp->mNext; + delete (delp); + continue; + } + xferp = xferp->mNext; + } + } + } +} + +/////////////////////////////////////////////////////////// + +U32 LLXferManager::numActiveListEntries(LLXfer *list_head) +{ + U32 num_entries = 0; + + while (list_head) + { + if ((list_head->mStatus == e_LL_XFER_IN_PROGRESS)) + { + num_entries++; + } + list_head = list_head->mNext; + } + return(num_entries); +} + +/////////////////////////////////////////////////////////// + +S32 LLXferManager::numPendingXfers(const LLHost &host) +{ + LLHostStatus *host_statusp = NULL; + + for (host_statusp = mOutgoingHosts.getFirstData(); host_statusp; host_statusp = mOutgoingHosts.getNextData()) + { + if (host_statusp->mHost == host) + { + return (host_statusp->mNumPending); + } + } + return 0; +} + +/////////////////////////////////////////////////////////// + +S32 LLXferManager::numActiveXfers(const LLHost &host) +{ + LLHostStatus *host_statusp = NULL; + + for (host_statusp = mOutgoingHosts.getFirstData(); host_statusp; host_statusp = mOutgoingHosts.getNextData()) + { + if (host_statusp->mHost == host) + { + return (host_statusp->mNumActive); + } + } + return 0; +} + +/////////////////////////////////////////////////////////// + +void LLXferManager::changeNumActiveXfers(const LLHost &host, S32 delta) +{ + LLHostStatus *host_statusp = NULL; + + for (host_statusp = mOutgoingHosts.getFirstData(); host_statusp; host_statusp = mOutgoingHosts.getNextData()) + { + if (host_statusp->mHost == host) + { + host_statusp->mNumActive += delta; + } + } +} + +/////////////////////////////////////////////////////////// + +void LLXferManager::registerCallbacks(LLMessageSystem *msgsystem) +{ + msgsystem->setHandlerFuncFast(_PREHASH_ConfirmXferPacket, process_confirm_packet, NULL); + msgsystem->setHandlerFuncFast(_PREHASH_RequestXfer, process_request_xfer, NULL); + msgsystem->setHandlerFuncFast(_PREHASH_SendXferPacket, continue_file_receive, NULL); + msgsystem->setHandlerFuncFast(_PREHASH_AbortXfer, process_abort_xfer, NULL); +} + +/////////////////////////////////////////////////////////// + +U64 LLXferManager::getNextID () +{ + LLUUID a_guid; + + a_guid.generate(); + + + return(*((U64*)(a_guid.mData))); +} + +/////////////////////////////////////////////////////////// + +S32 LLXferManager::encodePacketNum(S32 packet_num, BOOL is_EOF) +{ + if (is_EOF) + { + packet_num |= 0x80000000; + } + return packet_num; +} + +/////////////////////////////////////////////////////////// + +S32 LLXferManager::decodePacketNum(S32 packet_num) +{ + return(packet_num & 0x0FFFFFFF); +} + +/////////////////////////////////////////////////////////// + +BOOL LLXferManager::isLastPacket(S32 packet_num) +{ + return(packet_num & 0x80000000); +} + +/////////////////////////////////////////////////////////// + +U64 LLXferManager::registerXfer(const void *datap, const S32 length) +{ + LLXfer *xferp; + U64 xfer_id = getNextID(); + + xferp = (LLXfer *) new LLXfer_Mem(); + if (xferp) + { + xferp->mNext = mSendList; + mSendList = xferp; + + xfer_id = ((LLXfer_Mem *)xferp)->registerXfer(xfer_id, datap,length); + + if (!xfer_id) + { + removeXfer(xferp,&mSendList); + } + } + else + { + llerrs << "Xfer allocation error" << llendl; + xfer_id = 0; + } + + return(xfer_id); +} + +/////////////////////////////////////////////////////////// + +void LLXferManager::requestFile(const char* local_filename, + const char* remote_filename, + ELLPath remote_path, + const LLHost& remote_host, + BOOL delete_remote_on_completion, + void (*callback)(void**,S32), + void** user_data, + BOOL is_priority, + BOOL use_big_packets) +{ + LLXfer *xferp; + + for (xferp = mReceiveList; xferp ; xferp = xferp->mNext) + { + if (xferp->getXferTypeTag() == LLXfer::XFER_FILE + && (((LLXfer_File*)xferp)->matchesLocalFilename(local_filename)) + && (((LLXfer_File*)xferp)->matchesRemoteFilename(remote_filename, remote_path)) + && (remote_host == xferp->mRemoteHost) + && (callback == xferp->mCallback) + && (user_data == xferp->mCallbackDataHandle)) + + { + // cout << "requested a xfer already in progress" << endl; + return; + } + } + + S32 chunk_size = use_big_packets ? LL_XFER_LARGE_PAYLOAD : -1; + xferp = (LLXfer *) new LLXfer_File(chunk_size); + if (xferp) + { + addToList(xferp, mReceiveList, is_priority); + + // Remove any file by the same name that happens to be lying + // around. + // Note: according to AaronB, this is here to deal with locks on files that were + // in transit during a crash, + if(delete_remote_on_completion && + (strstr(remote_filename,".tmp") == &remote_filename[strlen(remote_filename)-4])) /* Flawfinder : ignore */ + { + LLFile::remove(local_filename); + } + ((LLXfer_File *)xferp)->initializeRequest( + getNextID(), + local_filename, + remote_filename, + remote_path, + remote_host, + delete_remote_on_completion, + callback,user_data); + startPendingDownloads(); + } + else + { + llerrs << "Xfer allocation error" << llendl; + } +} + +void LLXferManager::requestFile(const char* remote_filename, + ELLPath remote_path, + const LLHost& remote_host, + BOOL delete_remote_on_completion, + void (*callback)(void*,S32,void**,S32), + void** user_data, + BOOL is_priority) +{ + LLXfer *xferp; + + xferp = (LLXfer *) new LLXfer_Mem(); + if (xferp) + { + addToList(xferp, mReceiveList, is_priority); + ((LLXfer_Mem *)xferp)->initializeRequest(getNextID(), + remote_filename, + remote_path, + remote_host, + delete_remote_on_completion, + callback, user_data); + startPendingDownloads(); + } + else + { + llerrs << "Xfer allocation error" << llendl; + } +} + +void LLXferManager::requestVFile(const LLUUID& local_id, + const LLUUID& remote_id, + LLAssetType::EType type, LLVFS* vfs, + const LLHost& remote_host, + void (*callback)(void**, S32), + void** user_data, + BOOL is_priority) +{ + LLXfer *xferp; + + for (xferp = mReceiveList; xferp ; xferp = xferp->mNext) + { + if (xferp->getXferTypeTag() == LLXfer::XFER_VFILE + && (((LLXfer_VFile*)xferp)->matchesLocalFile(local_id, type)) + && (((LLXfer_VFile*)xferp)->matchesRemoteFile(remote_id, type)) + && (remote_host == xferp->mRemoteHost) + && (callback == xferp->mCallback) + && (user_data == xferp->mCallbackDataHandle)) + + { + // cout << "requested a xfer already in progress" << endl; + return; + } + } + + xferp = (LLXfer *) new LLXfer_VFile(); + if (xferp) + { + addToList(xferp, mReceiveList, is_priority); + ((LLXfer_VFile *)xferp)->initializeRequest(getNextID(), + vfs, + local_id, + remote_id, + type, + remote_host, + callback, + user_data); + startPendingDownloads(); + } + else + { + llerrs << "Xfer allocation error" << llendl; + } + +} + +/* +void LLXferManager::requestXfer( + const char *local_filename, + BOOL delete_remote_on_completion, + U64 xfer_id, + const LLHost &remote_host, + void (*callback)(void **,S32), + void **user_data) +{ + LLXfer *xferp; + + for (xferp = mReceiveList; xferp ; xferp = xferp->mNext) + { + if (xferp->getXferTypeTag() == LLXfer::XFER_FILE + && (((LLXfer_File*)xferp)->matchesLocalFilename(local_filename)) + && (xfer_id == xferp->mID) + && (remote_host == xferp->mRemoteHost) + && (callback == xferp->mCallback) + && (user_data == xferp->mCallbackDataHandle)) + + { + // cout << "requested a xfer already in progress" << endl; + return; + } + } + + xferp = (LLXfer *) new LLXfer_File(); + if (xferp) + { + xferp->mNext = mReceiveList; + mReceiveList = xferp; + + ((LLXfer_File *)xferp)->initializeRequest(xfer_id,local_filename,"",LL_PATH_NONE,remote_host,delete_remote_on_completion,callback,user_data); + startPendingDownloads(); + } + else + { + llerrs << "Xfer allcoation error" << llendl; + } +} + +void LLXferManager::requestXfer(U64 xfer_id, const LLHost &remote_host, BOOL delete_remote_on_completion, void (*callback)(void *,S32,void **,S32),void **user_data) +{ + LLXfer *xferp; + + xferp = (LLXfer *) new LLXfer_Mem(); + if (xferp) + { + xferp->mNext = mReceiveList; + mReceiveList = xferp; + + ((LLXfer_Mem *)xferp)->initializeRequest(xfer_id,"",LL_PATH_NONE,remote_host,delete_remote_on_completion,callback,user_data); + startPendingDownloads(); + } + else + { + llerrs << "Xfer allcoation error" << llendl; + } +} +*/ +/////////////////////////////////////////////////////////// + +void LLXferManager::processReceiveData (LLMessageSystem *mesgsys, void ** /*user_data*/) +{ + // there's sometimes an extra 4 bytes added to an xfer payload + const S32 BUF_SIZE = LL_XFER_LARGE_PAYLOAD + 4; + char fdata_buf[LL_XFER_LARGE_PAYLOAD + 4]; /* Flawfinder : ignore */ + S32 fdata_size; + U64 id; + S32 packetnum; + LLXfer * xferp; + + mesgsys->getU64Fast(_PREHASH_XferID, _PREHASH_ID, id); + mesgsys->getS32Fast(_PREHASH_XferID, _PREHASH_Packet, packetnum); + + fdata_size = mesgsys->getSizeFast(_PREHASH_DataPacket,_PREHASH_Data); + mesgsys->getBinaryDataFast(_PREHASH_DataPacket, _PREHASH_Data, fdata_buf, 0, 0, BUF_SIZE); + + xferp = findXfer(id, mReceiveList); + + if (!xferp) + { + char U64_BUF[MAX_STRING]; /* Flawfinder : ignore */ + llwarns << "received xfer data from " << mesgsys->getSender() + << " for non-existent xfer id: " + << U64_to_str(id, U64_BUF, sizeof(U64_BUF)) << llendl; + return; + } + + S32 xfer_size; + + if (decodePacketNum(packetnum) != xferp->mPacketNum) // is the packet different from what we were expecting? + { + // confirm it if it was a resend of the last one, since the confirmation might have gotten dropped + if (decodePacketNum(packetnum) == (xferp->mPacketNum - 1)) + { + llinfos << "Reconfirming xfer " << xferp->mRemoteHost << ":" << xferp->getName() << " packet " << packetnum << llendl; sendConfirmPacket(mesgsys, id, decodePacketNum(packetnum), mesgsys->getSender()); + } + else + { + llinfos << "Ignoring xfer " << xferp->mRemoteHost << ":" << xferp->getName() << " recv'd packet " << packetnum << "; expecting " << xferp->mPacketNum << llendl; + } + return; + } + + S32 result = 0; + + if (xferp->mPacketNum == 0) // first packet has size encoded as additional S32 at beginning of data + { + ntohmemcpy(&xfer_size,fdata_buf,MVT_S32,sizeof(S32)); + +// do any necessary things on first packet ie. allocate memory + xferp->setXferSize(xfer_size); + + // adjust buffer start and size + result = xferp->receiveData(&(fdata_buf[sizeof(S32)]),fdata_size-(sizeof(S32))); + } + else + { + result = xferp->receiveData(fdata_buf,fdata_size); + } + + if (result == LL_ERR_CANNOT_OPEN_FILE) + { + xferp->abort(LL_ERR_CANNOT_OPEN_FILE); + removeXfer(xferp,&mReceiveList); + startPendingDownloads(); + return; + } + + xferp->mPacketNum++; // expect next packet + + if (!mUseAckThrottling) + { + // No throttling, confirm right away + sendConfirmPacket(mesgsys, id, decodePacketNum(packetnum), mesgsys->getSender()); + } + else + { + // Throttling, put on queue to be confirmed later. + LLXferAckInfo ack_info; + ack_info.mID = id; + ack_info.mPacketNum = decodePacketNum(packetnum); + ack_info.mRemoteHost = mesgsys->getSender(); + mXferAckQueue.push(ack_info); + } + + if (isLastPacket(packetnum)) + { + xferp->processEOF(); + removeXfer(xferp,&mReceiveList); + startPendingDownloads(); + } +} + +/////////////////////////////////////////////////////////// + +void LLXferManager::sendConfirmPacket (LLMessageSystem *mesgsys, U64 id, S32 packetnum, const LLHost &remote_host) +{ +#if LL_XFER_PROGRESS_MESSAGES + if (!(packetnum % 50)) + { + cout << "confirming xfer packet #" << packetnum << endl; + } +#endif + mesgsys->newMessageFast(_PREHASH_ConfirmXferPacket); + mesgsys->nextBlockFast(_PREHASH_XferID); + mesgsys->addU64Fast(_PREHASH_ID, id); + mesgsys->addU32Fast(_PREHASH_Packet, packetnum); + + mesgsys->sendMessage(remote_host); +} + +/////////////////////////////////////////////////////////// + +void LLXferManager::processFileRequest (LLMessageSystem *mesgsys, void ** /*user_data*/) +{ + + U64 id; + char local_filename[MAX_STRING]; /* Flawfinder : ignore */ + ELLPath local_path = LL_PATH_NONE; + S32 result = LL_ERR_NOERR; + LLUUID uuid; + LLAssetType::EType type; + S16 type_s16; + BOOL b_use_big_packets; + + mesgsys->getBOOL("XferID", "UseBigPackets", b_use_big_packets); + + mesgsys->getU64Fast(_PREHASH_XferID, _PREHASH_ID, id); + char U64_BUF[MAX_STRING]; /* Flawfinder : ignore */ + llinfos << "xfer request id: " << U64_to_str(id, U64_BUF, sizeof(U64_BUF)) + << " to " << mesgsys->getSender() << llendl; + + mesgsys->getStringFast(_PREHASH_XferID, _PREHASH_Filename, MAX_STRING, local_filename); + + U8 local_path_u8; + mesgsys->getU8("XferID", "FilePath", local_path_u8); + if( local_path_u8 < (U8)LL_PATH_COUNT ) + { + local_path = (ELLPath)local_path_u8; + } + else + { + llwarns << "Invalid file path in LLXferManager::processFileRequest() " << (U32)local_path_u8 << llendl; + } + + mesgsys->getUUIDFast(_PREHASH_XferID, _PREHASH_VFileID, uuid); + mesgsys->getS16Fast(_PREHASH_XferID, _PREHASH_VFileType, type_s16); + type = (LLAssetType::EType)type_s16; + + LLXfer *xferp; + + if (uuid != LLUUID::null) + { + if(NULL == LLAssetType::lookup(type)) + { + llwarns << "Invalid type for xfer request: " << uuid << ":" + << type_s16 << " to " << mesgsys->getSender() << llendl; + return; + } + + llinfos << "starting vfile transfer: " << uuid << "," << LLAssetType::lookup(type) << " to " << mesgsys->getSender() << llendl; + + if (! mVFS) + { + llwarns << "Attempt to send VFile w/o available VFS" << llendl; + return; + } + + xferp = (LLXfer *)new LLXfer_VFile(mVFS, uuid, type); + if (xferp) + { + xferp->mNext = mSendList; + mSendList = xferp; + result = xferp->startSend(id,mesgsys->getSender()); + } + else + { + llerrs << "Xfer allcoation error" << llendl; + } + } + else if (strlen(local_filename)) /* Flawfinder : ignore */ + { + std::string expanded_filename = gDirUtilp->getExpandedFilename( local_path, local_filename ); + llinfos << "starting file transfer: " << expanded_filename << " to " << mesgsys->getSender() << llendl; + + BOOL delete_local_on_completion = FALSE; + mesgsys->getBOOL("XferID", "DeleteOnCompletion", delete_local_on_completion); + + // -1 chunk_size causes it to use the default + xferp = (LLXfer *)new LLXfer_File(expanded_filename, delete_local_on_completion, b_use_big_packets ? LL_XFER_LARGE_PAYLOAD : -1); + + if (xferp) + { + xferp->mNext = mSendList; + mSendList = xferp; + result = xferp->startSend(id,mesgsys->getSender()); + } + else + { + llerrs << "Xfer allcoation error" << llendl; + } + } + else + { + char U64_BUF[MAX_STRING]; /* Flawfinder : ignore */ + llinfos << "starting memory transfer: " + << U64_to_str(id, U64_BUF, sizeof(U64_BUF)) << " to " + << mesgsys->getSender() << llendl; + + xferp = findXfer(id, mSendList); + + if (xferp) + { + result = xferp->startSend(id,mesgsys->getSender()); + } + else + { + llinfos << "Warning: " << U64_BUF << " not found." << llendl; + result = LL_ERR_FILE_NOT_FOUND; + } + } + + if (result) + { + if (xferp) + { + xferp->abort(result); + removeXfer(xferp,&mSendList); + } + else // can happen with a memory transfer not found + { + llinfos << "Aborting xfer to " << mesgsys->getSender() << " with error: " << result << llendl; + + mesgsys->newMessageFast(_PREHASH_AbortXfer); + mesgsys->nextBlockFast(_PREHASH_XferID); + mesgsys->addU64Fast(_PREHASH_ID, id); + mesgsys->addS32Fast(_PREHASH_Result, result); + + mesgsys->sendMessage(mesgsys->getSender()); + } + } + else if(xferp && (numActiveXfers(xferp->mRemoteHost) < mMaxOutgoingXfersPerCircuit)) + { + xferp->sendNextPacket(); + changeNumActiveXfers(xferp->mRemoteHost,1); +// llinfos << "***STARTING XFER IMMEDIATELY***" << llendl; + } + else + { + if(xferp) + { + llinfos << " queueing xfer request, " << numPendingXfers(xferp->mRemoteHost) << " ahead of this one" << llendl; + } + else + { + llwarns << "LLXferManager::processFileRequest() - no xfer found!" + << llendl; + } + } +} + +/////////////////////////////////////////////////////////// + +void LLXferManager::processConfirmation (LLMessageSystem *mesgsys, void ** /*user_data*/) +{ + U64 id = 0; + S32 packetNum = 0; + + mesgsys->getU64Fast(_PREHASH_XferID, _PREHASH_ID, id); + mesgsys->getS32Fast(_PREHASH_XferID, _PREHASH_Packet, packetNum); + + LLXfer* xferp = findXfer(id, mSendList); + if (xferp) + { +// cout << "confirmed packet #" << packetNum << " ping: "<< xferp->ACKTimer.getElapsedTimeF32() << endl; + xferp->mWaitingForACK = FALSE; + if (xferp->mStatus == e_LL_XFER_IN_PROGRESS) + { + xferp->sendNextPacket(); + } + else + { + removeXfer(xferp, &mSendList); + } + } +} + +/////////////////////////////////////////////////////////// + +void LLXferManager::retransmitUnackedPackets () +{ + LLXfer *xferp; + LLXfer *delp; + xferp = mReceiveList; + while(xferp) + { + if (xferp->mStatus == e_LL_XFER_IN_PROGRESS) + { + // if the circuit dies, abort + if (! gMessageSystem->mCircuitInfo.isCircuitAlive( xferp->mRemoteHost )) + { + llinfos << "Xfer found in progress on dead circuit, aborting" << llendl; + xferp->mCallbackResult = LL_ERR_CIRCUIT_GONE; + xferp->processEOF(); + delp = xferp; + xferp = xferp->mNext; + removeXfer(delp,&mReceiveList); + continue; + } + + } + xferp = xferp->mNext; + } + + xferp = mSendList; + updateHostStatus(); + F32 et; + while (xferp) + { + if (xferp->mWaitingForACK && ( (et = xferp->ACKTimer.getElapsedTimeF32()) > LL_PACKET_TIMEOUT)) + { + if (xferp->mRetries > LL_PACKET_RETRY_LIMIT) + { + llinfos << "dropping xfer " << xferp->mRemoteHost << ":" << xferp->getName() << " packet retransmit limit exceeded, xfer dropped" << llendl; + xferp->abort(LL_ERR_TCP_TIMEOUT); + delp = xferp; + xferp = xferp->mNext; + removeXfer(delp,&mSendList); + } + else + { + llinfos << "resending xfer " << xferp->mRemoteHost << ":" << xferp->getName() << " packet unconfirmed after: "<< et << " sec, packet " << xferp->mPacketNum << llendl; + xferp->resendLastPacket(); + xferp = xferp->mNext; + } + } + else if ((xferp->mStatus == e_LL_XFER_REGISTERED) && ( (et = xferp->ACKTimer.getElapsedTimeF32()) > LL_XFER_REGISTRATION_TIMEOUT)) + { + llinfos << "registered xfer never requested, xfer dropped" << llendl; + xferp->abort(LL_ERR_TCP_TIMEOUT); + delp = xferp; + xferp = xferp->mNext; + removeXfer(delp,&mSendList); + } + else if (xferp->mStatus == e_LL_XFER_ABORTED) + { + llwarns << "Removing aborted xfer " << xferp->mRemoteHost << ":" << xferp->getName() << llendl; + delp = xferp; + xferp = xferp->mNext; + removeXfer(delp,&mSendList); + } + else if (xferp->mStatus == e_LL_XFER_PENDING) + { +// llinfos << "*** numActiveXfers = " << numActiveXfers(xferp->mRemoteHost) << " mMaxOutgoingXfersPerCircuit = " << mMaxOutgoingXfersPerCircuit << llendl; + if (numActiveXfers(xferp->mRemoteHost) < mMaxOutgoingXfersPerCircuit) + { +// llinfos << "bumping pending xfer to active" << llendl; + xferp->sendNextPacket(); + changeNumActiveXfers(xferp->mRemoteHost,1); + } + xferp = xferp->mNext; + } + else + { + xferp = xferp->mNext; + } + } + + // + // HACK - if we're using xfer confirm throttling, throttle our xfer confirms here + // so we don't blow through bandwidth. + // + + while (mXferAckQueue.getLength()) + { + if (mAckThrottle.checkOverflow(1000.0f*8.0f)) + { + break; + } + //llinfos << "Confirm packet queue length:" << mXferAckQueue.getLength() << llendl; + LLXferAckInfo ack_info; + mXferAckQueue.pop(ack_info); + //llinfos << "Sending confirm packet" << llendl; + sendConfirmPacket(gMessageSystem, ack_info.mID, ack_info.mPacketNum, ack_info.mRemoteHost); + mAckThrottle.throttleOverflow(1000.f*8.f); // Assume 1000 bytes/packet + } +} + + +/////////////////////////////////////////////////////////// + +void LLXferManager::processAbort (LLMessageSystem *mesgsys, void ** /*user_data*/) +{ + U64 id = 0; + S32 result_code = 0; + LLXfer * xferp; + + mesgsys->getU64Fast(_PREHASH_XferID, _PREHASH_ID, id); + mesgsys->getS32Fast(_PREHASH_XferID, _PREHASH_Result, result_code); + + xferp = findXfer(id, mReceiveList); + if (xferp) + { + xferp->mCallbackResult = result_code; + xferp->processEOF(); + removeXfer(xferp, &mReceiveList); + startPendingDownloads(); + } +} + +/////////////////////////////////////////////////////////// + +void LLXferManager::startPendingDownloads() +{ + // This method goes through the list, and starts pending + // operations until active downloads == mMaxIncomingXfers. I copy + // the pending xfers into a temporary data structure because the + // xfers are stored as an intrusive linked list where older + // requests get pushed toward the back. Thus, if we didn't do a + // stateful iteration, it would be possible for old requests to + // never start. + LLXfer* xferp = mReceiveList; + LLLinkedList<LLXfer> pending_downloads; + S32 download_count = 0; + S32 pending_count = 0; + while(xferp) + { + if(xferp->mStatus == e_LL_XFER_PENDING) + { + ++pending_count; // getLength() is O(N), so track it here. + pending_downloads.addData(xferp); + } + else if(xferp->mStatus == e_LL_XFER_IN_PROGRESS) + { + ++download_count; + } + xferp = xferp->mNext; + } + + S32 start_count = mMaxIncomingXfers - download_count; + + lldebugs << "LLXferManager::startPendingDownloads() - XFER_IN_PROGRESS: " + << download_count << " XFER_PENDING: " << pending_count + << " startring " << llmin(start_count, pending_count) << llendl; + + if((start_count > 0) && (pending_count > 0)) + { + S32 result; + xferp = pending_downloads.getFirstData(); + while(start_count-- && xferp) + { + result = xferp->startDownload(); + if(result) + { + xferp->abort(result); + ++start_count; + } + xferp = pending_downloads.getNextData(); + } + } +} + +/////////////////////////////////////////////////////////// + +void LLXferManager::addToList(LLXfer* xferp, LLXfer*& head, BOOL is_priority) +{ + if(is_priority) + { + xferp->mNext = NULL; + LLXfer* next = head; + if(next) + { + while(next->mNext) + { + next = next->mNext; + } + next->mNext = xferp; + } + else + { + head = xferp; + } + } + else + { + xferp->mNext = head; + head = xferp; + } +} + +/////////////////////////////////////////////////////////// +// Globals and C routines +/////////////////////////////////////////////////////////// + +LLXferManager *gXferManager = NULL; + + +void start_xfer_manager(LLVFS *vfs) +{ + gXferManager = new LLXferManager(vfs); +} + +void cleanup_xfer_manager() +{ + if (gXferManager) + { + delete(gXferManager); + gXferManager = NULL; + } +} + +void process_confirm_packet (LLMessageSystem *mesgsys, void **user_data) +{ + gXferManager->processConfirmation(mesgsys,user_data); +} + +void process_request_xfer(LLMessageSystem *mesgsys, void **user_data) +{ + gXferManager->processFileRequest(mesgsys,user_data); +} + +void continue_file_receive(LLMessageSystem *mesgsys, void **user_data) +{ +#if LL_TEST_XFER_REXMIT + if (frand(1.f) > 0.05f) + { +#endif + gXferManager->processReceiveData(mesgsys,user_data); +#if LL_TEST_XFER_REXMIT + } + else + { + cout << "oops! dropped a xfer packet" << endl; + } +#endif +} + +void process_abort_xfer(LLMessageSystem *mesgsys, void **user_data) +{ + gXferManager->processAbort(mesgsys,user_data); +} + + + + + + + + + + + + + + + + + + + + + + + + + |