/** * @file llxfermanager.cpp * @brief implementation of LLXferManager class for a collection of xfers * * Copyright (c) 2001-$CurrentYear$, Linden Research, Inc. * $License$ */ #include "linden_common.h" #include "llxfermanager.h" #include "llxfer.h" #include "llxfer_file.h" #include "llxfer_mem.h" #include "llxfer_vfile.h" #include "llerror.h" #include "lluuid.h" #include "u64.h" const F32 LL_XFER_REGISTRATION_TIMEOUT = 60.0f; // timeout if a registered transfer hasn't been requested in 60 seconds const F32 LL_PACKET_TIMEOUT = 3.0f; // packet timeout at 3 s const S32 LL_PACKET_RETRY_LIMIT = 10; // packet retransmission limit const S32 LL_DEFAULT_MAX_SIMULTANEOUS_XFERS = 10; const S32 LL_DEFAULT_MAX_REQUEST_FIFO_XFERS = 1000; #define LL_XFER_PROGRESS_MESSAGES 0 #define LL_XFER_TEST_REXMIT 0 /////////////////////////////////////////////////////////// LLXferManager::LLXferManager (LLVFS *vfs) { init(vfs); } /////////////////////////////////////////////////////////// LLXferManager::~LLXferManager () { free(); } /////////////////////////////////////////////////////////// void LLXferManager::init (LLVFS *vfs) { mSendList = NULL; mReceiveList = NULL; setMaxOutgoingXfersPerCircuit(LL_DEFAULT_MAX_SIMULTANEOUS_XFERS); setMaxIncomingXfers(LL_DEFAULT_MAX_REQUEST_FIFO_XFERS); mVFS = vfs; // Turn on or off ack throttling mUseAckThrottling = FALSE; setAckThrottleBPS(100000); } /////////////////////////////////////////////////////////// void LLXferManager::free () { LLXfer *xferp; LLXfer *delp; mOutgoingHosts.deleteAllData(); delp = mSendList; while (delp) { xferp = delp->mNext; delete delp; delp = xferp; } mSendList = NULL; delp = mReceiveList; while (delp) { xferp = delp->mNext; delete delp; delp = xferp; } mReceiveList = NULL; } /////////////////////////////////////////////////////////// void LLXferManager::setMaxIncomingXfers(S32 max_num) { mMaxIncomingXfers = max_num; } /////////////////////////////////////////////////////////// void LLXferManager::setMaxOutgoingXfersPerCircuit(S32 max_num) { mMaxOutgoingXfersPerCircuit = max_num; } void LLXferManager::setUseAckThrottling(const BOOL use) { mUseAckThrottling = use; } void LLXferManager::setAckThrottleBPS(const F32 bps) { // Let's figure out the min we can set based on the ack retry rate // and number of simultaneous. // Assuming we're running as slow as possible, this is the lowest ack // rate we can use. F32 min_bps = (1000.f * 8.f* mMaxIncomingXfers) / LL_PACKET_TIMEOUT; // Set F32 actual_rate = llmax(min_bps*1.1f, bps); llinfos << "LLXferManager ack throttle min rate: " << min_bps << llendl; llinfos << "LLXferManager ack throttle actual rate: " << actual_rate << llendl; mAckThrottle.setRate(actual_rate); } /////////////////////////////////////////////////////////// void LLXferManager::updateHostStatus() { LLXfer *xferp; LLHostStatus *host_statusp = NULL; mOutgoingHosts.deleteAllData(); for (xferp = mSendList; xferp; xferp = xferp->mNext) { for (host_statusp = mOutgoingHosts.getFirstData(); host_statusp; host_statusp = mOutgoingHosts.getNextData()) { if (host_statusp->mHost == xferp->mRemoteHost) { break; } } if (!host_statusp) { host_statusp = new LLHostStatus(); if (host_statusp) { host_statusp->mHost = xferp->mRemoteHost; mOutgoingHosts.addData(host_statusp); } } if (host_statusp) { if (xferp->mStatus == e_LL_XFER_PENDING) { host_statusp->mNumPending++; } else if (xferp->mStatus == e_LL_XFER_IN_PROGRESS) { host_statusp->mNumActive++; } } } } /////////////////////////////////////////////////////////// void LLXferManager::printHostStatus() { LLHostStatus *host_statusp = NULL; if (mOutgoingHosts.getFirstData()) { llinfos << "Outgoing Xfers:" << llendl; for (host_statusp = mOutgoingHosts.getFirstData(); host_statusp; host_statusp = mOutgoingHosts.getNextData()) { llinfos << " " << host_statusp->mHost << " active: " << host_statusp->mNumActive << " pending: " << host_statusp->mNumPending << llendl; } } } /////////////////////////////////////////////////////////// LLXfer *LLXferManager::findXfer (U64 id, LLXfer *list_head) { LLXfer *xferp; for (xferp = list_head; xferp; xferp = xferp->mNext) { if (xferp->mID == id) { return(xferp); } } return(NULL); } /////////////////////////////////////////////////////////// void LLXferManager::removeXfer (LLXfer *delp, LLXfer **list_head) { LLXfer *xferp; if (delp) { if (*list_head == delp) { *list_head = delp->mNext; delete (delp); } else { xferp = *list_head; while (xferp->mNext) { if (xferp->mNext == delp) { xferp->mNext = delp->mNext; delete (delp); continue; } xferp = xferp->mNext; } } } } /////////////////////////////////////////////////////////// U32 LLXferManager::numActiveListEntries(LLXfer *list_head) { U32 num_entries = 0; while (list_head) { if ((list_head->mStatus == e_LL_XFER_IN_PROGRESS)) { num_entries++; } list_head = list_head->mNext; } return(num_entries); } /////////////////////////////////////////////////////////// S32 LLXferManager::numPendingXfers(const LLHost &host) { LLHostStatus *host_statusp = NULL; for (host_statusp = mOutgoingHosts.getFirstData(); host_statusp; host_statusp = mOutgoingHosts.getNextData()) { if (host_statusp->mHost == host) { return (host_statusp->mNumPending); } } return 0; } /////////////////////////////////////////////////////////// S32 LLXferManager::numActiveXfers(const LLHost &host) { LLHostStatus *host_statusp = NULL; for (host_statusp = mOutgoingHosts.getFirstData(); host_statusp; host_statusp = mOutgoingHosts.getNextData()) { if (host_statusp->mHost == host) { return (host_statusp->mNumActive); } } return 0; } /////////////////////////////////////////////////////////// void LLXferManager::changeNumActiveXfers(const LLHost &host, S32 delta) { LLHostStatus *host_statusp = NULL; for (host_statusp = mOutgoingHosts.getFirstData(); host_statusp; host_statusp = mOutgoingHosts.getNextData()) { if (host_statusp->mHost == host) { host_statusp->mNumActive += delta; } } } /////////////////////////////////////////////////////////// void LLXferManager::registerCallbacks(LLMessageSystem *msgsystem) { msgsystem->setHandlerFuncFast(_PREHASH_ConfirmXferPacket, process_confirm_packet, NULL); msgsystem->setHandlerFuncFast(_PREHASH_RequestXfer, process_request_xfer, NULL); msgsystem->setHandlerFuncFast(_PREHASH_SendXferPacket, continue_file_receive, NULL); msgsystem->setHandlerFuncFast(_PREHASH_AbortXfer, process_abort_xfer, NULL); } /////////////////////////////////////////////////////////// U64 LLXferManager::getNextID () { LLUUID a_guid; a_guid.generate(); return(*((U64*)(a_guid.mData))); } /////////////////////////////////////////////////////////// S32 LLXferManager::encodePacketNum(S32 packet_num, BOOL is_EOF) { if (is_EOF) { packet_num |= 0x80000000; } return packet_num; } /////////////////////////////////////////////////////////// S32 LLXferManager::decodePacketNum(S32 packet_num) { return(packet_num & 0x0FFFFFFF); } /////////////////////////////////////////////////////////// BOOL LLXferManager::isLastPacket(S32 packet_num) { return(packet_num & 0x80000000); } /////////////////////////////////////////////////////////// U64 LLXferManager::registerXfer(const void *datap, const S32 length) { LLXfer *xferp; U64 xfer_id = getNextID(); xferp = (LLXfer *) new LLXfer_Mem(); if (xferp) { xferp->mNext = mSendList; mSendList = xferp; xfer_id = ((LLXfer_Mem *)xferp)->registerXfer(xfer_id, datap,length); if (!xfer_id) { removeXfer(xferp,&mSendList); } } else { llerrs << "Xfer allocation error" << llendl; xfer_id = 0; } return(xfer_id); } /////////////////////////////////////////////////////////// void LLXferManager::requestFile(const char* local_filename, const char* remote_filename, ELLPath remote_path, const LLHost& remote_host, BOOL delete_remote_on_completion, void (*callback)(void**,S32), void** user_data, BOOL is_priority, BOOL use_big_packets) { LLXfer *xferp; for (xferp = mReceiveList; xferp ; xferp = xferp->mNext) { if (xferp->getXferTypeTag() == LLXfer::XFER_FILE && (((LLXfer_File*)xferp)->matchesLocalFilename(local_filename)) && (((LLXfer_File*)xferp)->matchesRemoteFilename(remote_filename, remote_path)) && (remote_host == xferp->mRemoteHost) && (callback == xferp->mCallback) && (user_data == xferp->mCallbackDataHandle)) { // cout << "requested a xfer already in progress" << endl; return; } } S32 chunk_size = use_big_packets ? LL_XFER_LARGE_PAYLOAD : -1; xferp = (LLXfer *) new LLXfer_File(chunk_size); if (xferp) { addToList(xferp, mReceiveList, is_priority); // Remove any file by the same name that happens to be lying // around. // Note: according to AaronB, this is here to deal with locks on files that were // in transit during a crash, if(delete_remote_on_completion && (strstr(remote_filename,".tmp") == &remote_filename[strlen(remote_filename)-4])) /* Flawfinder : ignore */ { LLFile::remove(local_filename); } ((LLXfer_File *)xferp)->initializeRequest( getNextID(), local_filename, remote_filename, remote_path, remote_host, delete_remote_on_completion, callback,user_data); startPendingDownloads(); } else { llerrs << "Xfer allocation error" << llendl; } } void LLXferManager::requestFile(const char* remote_filename, ELLPath remote_path, const LLHost& remote_host, BOOL delete_remote_on_completion, void (*callback)(void*,S32,void**,S32), void** user_data, BOOL is_priority) { LLXfer *xferp; xferp = (LLXfer *) new LLXfer_Mem(); if (xferp) { addToList(xferp, mReceiveList, is_priority); ((LLXfer_Mem *)xferp)->initializeRequest(getNextID(), remote_filename, remote_path, remote_host, delete_remote_on_completion, callback, user_data); startPendingDownloads(); } else { llerrs << "Xfer allocation error" << llendl; } } void LLXferManager::requestVFile(const LLUUID& local_id, const LLUUID& remote_id, LLAssetType::EType type, LLVFS* vfs, const LLHost& remote_host, void (*callback)(void**, S32), void** user_data, BOOL is_priority) { LLXfer *xferp; for (xferp = mReceiveList; xferp ; xferp = xferp->mNext) { if (xferp->getXferTypeTag() == LLXfer::XFER_VFILE && (((LLXfer_VFile*)xferp)->matchesLocalFile(local_id, type)) && (((LLXfer_VFile*)xferp)->matchesRemoteFile(remote_id, type)) && (remote_host == xferp->mRemoteHost) && (callback == xferp->mCallback) && (user_data == xferp->mCallbackDataHandle)) { // cout << "requested a xfer already in progress" << endl; return; } } xferp = (LLXfer *) new LLXfer_VFile(); if (xferp) { addToList(xferp, mReceiveList, is_priority); ((LLXfer_VFile *)xferp)->initializeRequest(getNextID(), vfs, local_id, remote_id, type, remote_host, callback, user_data); startPendingDownloads(); } else { llerrs << "Xfer allocation error" << llendl; } } /* void LLXferManager::requestXfer( const char *local_filename, BOOL delete_remote_on_completion, U64 xfer_id, const LLHost &remote_host, void (*callback)(void **,S32), void **user_data) { LLXfer *xferp; for (xferp = mReceiveList; xferp ; xferp = xferp->mNext) { if (xferp->getXferTypeTag() == LLXfer::XFER_FILE && (((LLXfer_File*)xferp)->matchesLocalFilename(local_filename)) && (xfer_id == xferp->mID) && (remote_host == xferp->mRemoteHost) && (callback == xferp->mCallback) && (user_data == xferp->mCallbackDataHandle)) { // cout << "requested a xfer already in progress" << endl; return; } } xferp = (LLXfer *) new LLXfer_File(); if (xferp) { xferp->mNext = mReceiveList; mReceiveList = xferp; ((LLXfer_File *)xferp)->initializeRequest(xfer_id,local_filename,"",LL_PATH_NONE,remote_host,delete_remote_on_completion,callback,user_data); startPendingDownloads(); } else { llerrs << "Xfer allcoation error" << llendl; } } void LLXferManager::requestXfer(U64 xfer_id, const LLHost &remote_host, BOOL delete_remote_on_completion, void (*callback)(void *,S32,void **,S32),void **user_data) { LLXfer *xferp; xferp = (LLXfer *) new LLXfer_Mem(); if (xferp) { xferp->mNext = mReceiveList; mReceiveList = xferp; ((LLXfer_Mem *)xferp)->initializeRequest(xfer_id,"",LL_PATH_NONE,remote_host,delete_remote_on_completion,callback,user_data); startPendingDownloads(); } else { llerrs << "Xfer allcoation error" << llendl; } } */ /////////////////////////////////////////////////////////// void LLXferManager::processReceiveData (LLMessageSystem *mesgsys, void ** /*user_data*/) { // there's sometimes an extra 4 bytes added to an xfer payload const S32 BUF_SIZE = LL_XFER_LARGE_PAYLOAD + 4; char fdata_buf[LL_XFER_LARGE_PAYLOAD + 4]; /* Flawfinder : ignore */ S32 fdata_size; U64 id; S32 packetnum; LLXfer * xferp; mesgsys->getU64Fast(_PREHASH_XferID, _PREHASH_ID, id); mesgsys->getS32Fast(_PREHASH_XferID, _PREHASH_Packet, packetnum); fdata_size = mesgsys->getSizeFast(_PREHASH_DataPacket,_PREHASH_Data); mesgsys->getBinaryDataFast(_PREHASH_DataPacket, _PREHASH_Data, fdata_buf, 0, 0, BUF_SIZE); xferp = findXfer(id, mReceiveList); if (!xferp) { char U64_BUF[MAX_STRING]; /* Flawfinder : ignore */ llwarns << "received xfer data from " << mesgsys->getSender() << " for non-existent xfer id: " << U64_to_str(id, U64_BUF, sizeof(U64_BUF)) << llendl; return; } S32 xfer_size; if (decodePacketNum(packetnum) != xferp->mPacketNum) // is the packet different from what we were expecting? { // confirm it if it was a resend of the last one, since the confirmation might have gotten dropped if (decodePacketNum(packetnum) == (xferp->mPacketNum - 1)) { llinfos << "Reconfirming xfer " << xferp->mRemoteHost << ":" << xferp->getName() << " packet " << packetnum << llendl; sendConfirmPacket(mesgsys, id, decodePacketNum(packetnum), mesgsys->getSender()); } else { llinfos << "Ignoring xfer " << xferp->mRemoteHost << ":" << xferp->getName() << " recv'd packet " << packetnum << "; expecting " << xferp->mPacketNum << llendl; } return; } S32 result = 0; if (xferp->mPacketNum == 0) // first packet has size encoded as additional S32 at beginning of data { ntohmemcpy(&xfer_size,fdata_buf,MVT_S32,sizeof(S32)); // do any necessary things on first packet ie. allocate memory xferp->setXferSize(xfer_size); // adjust buffer start and size result = xferp->receiveData(&(fdata_buf[sizeof(S32)]),fdata_size-(sizeof(S32))); } else { result = xferp->receiveData(fdata_buf,fdata_size); } if (result == LL_ERR_CANNOT_OPEN_FILE) { xferp->abort(LL_ERR_CANNOT_OPEN_FILE); removeXfer(xferp,&mReceiveList); startPendingDownloads(); return; } xferp->mPacketNum++; // expect next packet if (!mUseAckThrottling) { // No throttling, confirm right away sendConfirmPacket(mesgsys, id, decodePacketNum(packetnum), mesgsys->getSender()); } else { // Throttling, put on queue to be confirmed later. LLXferAckInfo ack_info; ack_info.mID = id; ack_info.mPacketNum = decodePacketNum(packetnum); ack_info.mRemoteHost = mesgsys->getSender(); mXferAckQueue.push(ack_info); } if (isLastPacket(packetnum)) { xferp->processEOF(); removeXfer(xferp,&mReceiveList); startPendingDownloads(); } } /////////////////////////////////////////////////////////// void LLXferManager::sendConfirmPacket (LLMessageSystem *mesgsys, U64 id, S32 packetnum, const LLHost &remote_host) { #if LL_XFER_PROGRESS_MESSAGES if (!(packetnum % 50)) { cout << "confirming xfer packet #" << packetnum << endl; } #endif mesgsys->newMessageFast(_PREHASH_ConfirmXferPacket); mesgsys->nextBlockFast(_PREHASH_XferID); mesgsys->addU64Fast(_PREHASH_ID, id); mesgsys->addU32Fast(_PREHASH_Packet, packetnum); mesgsys->sendMessage(remote_host); } /////////////////////////////////////////////////////////// void LLXferManager::processFileRequest (LLMessageSystem *mesgsys, void ** /*user_data*/) { U64 id; char local_filename[MAX_STRING]; /* Flawfinder : ignore */ ELLPath local_path = LL_PATH_NONE; S32 result = LL_ERR_NOERR; LLUUID uuid; LLAssetType::EType type; S16 type_s16; BOOL b_use_big_packets; mesgsys->getBOOL("XferID", "UseBigPackets", b_use_big_packets); mesgsys->getU64Fast(_PREHASH_XferID, _PREHASH_ID, id); char U64_BUF[MAX_STRING]; /* Flawfinder : ignore */ llinfos << "xfer request id: " << U64_to_str(id, U64_BUF, sizeof(U64_BUF)) << " to " << mesgsys->getSender() << llendl; mesgsys->getStringFast(_PREHASH_XferID, _PREHASH_Filename, MAX_STRING, local_filename); U8 local_path_u8; mesgsys->getU8("XferID", "FilePath", local_path_u8); if( local_path_u8 < (U8)LL_PATH_COUNT ) { local_path = (ELLPath)local_path_u8; } else { llwarns << "Invalid file path in LLXferManager::processFileRequest() " << (U32)local_path_u8 << llendl; } mesgsys->getUUIDFast(_PREHASH_XferID, _PREHASH_VFileID, uuid); mesgsys->getS16Fast(_PREHASH_XferID, _PREHASH_VFileType, type_s16); type = (LLAssetType::EType)type_s16; LLXfer *xferp; if (uuid != LLUUID::null) { if(NULL == LLAssetType::lookup(type)) { llwarns << "Invalid type for xfer request: " << uuid << ":" << type_s16 << " to " << mesgsys->getSender() << llendl; return; } llinfos << "starting vfile transfer: " << uuid << "," << LLAssetType::lookup(type) << " to " << mesgsys->getSender() << llendl; if (! mVFS) { llwarns << "Attempt to send VFile w/o available VFS" << llendl; return; } xferp = (LLXfer *)new LLXfer_VFile(mVFS, uuid, type); if (xferp) { xferp->mNext = mSendList; mSendList = xferp; result = xferp->startSend(id,mesgsys->getSender()); } else { llerrs << "Xfer allcoation error" << llendl; } } else if (strlen(local_filename)) /* Flawfinder : ignore */ { std::string expanded_filename = gDirUtilp->getExpandedFilename( local_path, local_filename ); llinfos << "starting file transfer: " << expanded_filename << " to " << mesgsys->getSender() << llendl; BOOL delete_local_on_completion = FALSE; mesgsys->getBOOL("XferID", "DeleteOnCompletion", delete_local_on_completion); // -1 chunk_size causes it to use the default xferp = (LLXfer *)new LLXfer_File(expanded_filename, delete_local_on_completion, b_use_big_packets ? LL_XFER_LARGE_PAYLOAD : -1); if (xferp) { xferp->mNext = mSendList; mSendList = xferp; result = xferp->startSend(id,mesgsys->getSender()); } else { llerrs << "Xfer allcoation error" << llendl; } } else { char U64_BUF[MAX_STRING]; /* Flawfinder : ignore */ llinfos << "starting memory transfer: " << U64_to_str(id, U64_BUF, sizeof(U64_BUF)) << " to " << mesgsys->getSender() << llendl; xferp = findXfer(id, mSendList); if (xferp) { result = xferp->startSend(id,mesgsys->getSender()); } else { llinfos << "Warning: " << U64_BUF << " not found." << llendl; result = LL_ERR_FILE_NOT_FOUND; } } if (result) { if (xferp) { xferp->abort(result); removeXfer(xferp,&mSendList); } else // can happen with a memory transfer not found { llinfos << "Aborting xfer to " << mesgsys->getSender() << " with error: " << result << llendl; mesgsys->newMessageFast(_PREHASH_AbortXfer); mesgsys->nextBlockFast(_PREHASH_XferID); mesgsys->addU64Fast(_PREHASH_ID, id); mesgsys->addS32Fast(_PREHASH_Result, result); mesgsys->sendMessage(mesgsys->getSender()); } } else if(xferp && (numActiveXfers(xferp->mRemoteHost) < mMaxOutgoingXfersPerCircuit)) { xferp->sendNextPacket(); changeNumActiveXfers(xferp->mRemoteHost,1); // llinfos << "***STARTING XFER IMMEDIATELY***" << llendl; } else { if(xferp) { llinfos << " queueing xfer request, " << numPendingXfers(xferp->mRemoteHost) << " ahead of this one" << llendl; } else { llwarns << "LLXferManager::processFileRequest() - no xfer found!" << llendl; } } } /////////////////////////////////////////////////////////// void LLXferManager::processConfirmation (LLMessageSystem *mesgsys, void ** /*user_data*/) { U64 id = 0; S32 packetNum = 0; mesgsys->getU64Fast(_PREHASH_XferID, _PREHASH_ID, id); mesgsys->getS32Fast(_PREHASH_XferID, _PREHASH_Packet, packetNum); LLXfer* xferp = findXfer(id, mSendList); if (xferp) { // cout << "confirmed packet #" << packetNum << " ping: "<< xferp->ACKTimer.getElapsedTimeF32() << endl; xferp->mWaitingForACK = FALSE; if (xferp->mStatus == e_LL_XFER_IN_PROGRESS) { xferp->sendNextPacket(); } else { removeXfer(xferp, &mSendList); } } } /////////////////////////////////////////////////////////// void LLXferManager::retransmitUnackedPackets () { LLXfer *xferp; LLXfer *delp; xferp = mReceiveList; while(xferp) { if (xferp->mStatus == e_LL_XFER_IN_PROGRESS) { // if the circuit dies, abort if (! gMessageSystem->mCircuitInfo.isCircuitAlive( xferp->mRemoteHost )) { llinfos << "Xfer found in progress on dead circuit, aborting" << llendl; xferp->mCallbackResult = LL_ERR_CIRCUIT_GONE; xferp->processEOF(); delp = xferp; xferp = xferp->mNext; removeXfer(delp,&mReceiveList); continue; } } xferp = xferp->mNext; } xferp = mSendList; updateHostStatus(); F32 et; while (xferp) { if (xferp->mWaitingForACK && ( (et = xferp->ACKTimer.getElapsedTimeF32()) > LL_PACKET_TIMEOUT)) { if (xferp->mRetries > LL_PACKET_RETRY_LIMIT) { llinfos << "dropping xfer " << xferp->mRemoteHost << ":" << xferp->getName() << " packet retransmit limit exceeded, xfer dropped" << llendl; xferp->abort(LL_ERR_TCP_TIMEOUT); delp = xferp; xferp = xferp->mNext; removeXfer(delp,&mSendList); } else { llinfos << "resending xfer " << xferp->mRemoteHost << ":" << xferp->getName() << " packet unconfirmed after: "<< et << " sec, packet " << xferp->mPacketNum << llendl; xferp->resendLastPacket(); xferp = xferp->mNext; } } else if ((xferp->mStatus == e_LL_XFER_REGISTERED) && ( (et = xferp->ACKTimer.getElapsedTimeF32()) > LL_XFER_REGISTRATION_TIMEOUT)) { llinfos << "registered xfer never requested, xfer dropped" << llendl; xferp->abort(LL_ERR_TCP_TIMEOUT); delp = xferp; xferp = xferp->mNext; removeXfer(delp,&mSendList); } else if (xferp->mStatus == e_LL_XFER_ABORTED) { llwarns << "Removing aborted xfer " << xferp->mRemoteHost << ":" << xferp->getName() << llendl; delp = xferp; xferp = xferp->mNext; removeXfer(delp,&mSendList); } else if (xferp->mStatus == e_LL_XFER_PENDING) { // llinfos << "*** numActiveXfers = " << numActiveXfers(xferp->mRemoteHost) << " mMaxOutgoingXfersPerCircuit = " << mMaxOutgoingXfersPerCircuit << llendl; if (numActiveXfers(xferp->mRemoteHost) < mMaxOutgoingXfersPerCircuit) { // llinfos << "bumping pending xfer to active" << llendl; xferp->sendNextPacket(); changeNumActiveXfers(xferp->mRemoteHost,1); } xferp = xferp->mNext; } else { xferp = xferp->mNext; } } // // HACK - if we're using xfer confirm throttling, throttle our xfer confirms here // so we don't blow through bandwidth. // while (mXferAckQueue.getLength()) { if (mAckThrottle.checkOverflow(1000.0f*8.0f)) { break; } //llinfos << "Confirm packet queue length:" << mXferAckQueue.getLength() << llendl; LLXferAckInfo ack_info; mXferAckQueue.pop(ack_info); //llinfos << "Sending confirm packet" << llendl; sendConfirmPacket(gMessageSystem, ack_info.mID, ack_info.mPacketNum, ack_info.mRemoteHost); mAckThrottle.throttleOverflow(1000.f*8.f); // Assume 1000 bytes/packet } } /////////////////////////////////////////////////////////// void LLXferManager::processAbort (LLMessageSystem *mesgsys, void ** /*user_data*/) { U64 id = 0; S32 result_code = 0; LLXfer * xferp; mesgsys->getU64Fast(_PREHASH_XferID, _PREHASH_ID, id); mesgsys->getS32Fast(_PREHASH_XferID, _PREHASH_Result, result_code); xferp = findXfer(id, mReceiveList); if (xferp) { xferp->mCallbackResult = result_code; xferp->processEOF(); removeXfer(xferp, &mReceiveList); startPendingDownloads(); } } /////////////////////////////////////////////////////////// void LLXferManager::startPendingDownloads() { // This method goes through the list, and starts pending // operations until active downloads == mMaxIncomingXfers. I copy // the pending xfers into a temporary data structure because the // xfers are stored as an intrusive linked list where older // requests get pushed toward the back. Thus, if we didn't do a // stateful iteration, it would be possible for old requests to // never start. LLXfer* xferp = mReceiveList; LLLinkedList pending_downloads; S32 download_count = 0; S32 pending_count = 0; while(xferp) { if(xferp->mStatus == e_LL_XFER_PENDING) { ++pending_count; // getLength() is O(N), so track it here. pending_downloads.addData(xferp); } else if(xferp->mStatus == e_LL_XFER_IN_PROGRESS) { ++download_count; } xferp = xferp->mNext; } S32 start_count = mMaxIncomingXfers - download_count; lldebugs << "LLXferManager::startPendingDownloads() - XFER_IN_PROGRESS: " << download_count << " XFER_PENDING: " << pending_count << " startring " << llmin(start_count, pending_count) << llendl; if((start_count > 0) && (pending_count > 0)) { S32 result; xferp = pending_downloads.getFirstData(); while(start_count-- && xferp) { result = xferp->startDownload(); if(result) { xferp->abort(result); ++start_count; } xferp = pending_downloads.getNextData(); } } } /////////////////////////////////////////////////////////// void LLXferManager::addToList(LLXfer* xferp, LLXfer*& head, BOOL is_priority) { if(is_priority) { xferp->mNext = NULL; LLXfer* next = head; if(next) { while(next->mNext) { next = next->mNext; } next->mNext = xferp; } else { head = xferp; } } else { xferp->mNext = head; head = xferp; } } /////////////////////////////////////////////////////////// // Globals and C routines /////////////////////////////////////////////////////////// LLXferManager *gXferManager = NULL; void start_xfer_manager(LLVFS *vfs) { gXferManager = new LLXferManager(vfs); } void cleanup_xfer_manager() { if (gXferManager) { delete(gXferManager); gXferManager = NULL; } } void process_confirm_packet (LLMessageSystem *mesgsys, void **user_data) { gXferManager->processConfirmation(mesgsys,user_data); } void process_request_xfer(LLMessageSystem *mesgsys, void **user_data) { gXferManager->processFileRequest(mesgsys,user_data); } void continue_file_receive(LLMessageSystem *mesgsys, void **user_data) { #if LL_TEST_XFER_REXMIT if (ll_frand() > 0.05f) { #endif gXferManager->processReceiveData(mesgsys,user_data); #if LL_TEST_XFER_REXMIT } else { cout << "oops! dropped a xfer packet" << endl; } #endif } void process_abort_xfer(LLMessageSystem *mesgsys, void **user_data) { gXferManager->processAbort(mesgsys,user_data); }