summaryrefslogtreecommitdiff
path: root/indra/llmessage
diff options
context:
space:
mode:
Diffstat (limited to 'indra/llmessage')
-rw-r--r--indra/llmessage/llavatarnamecache.cpp16
-rw-r--r--indra/llmessage/llinstantmessage.cpp208
-rw-r--r--indra/llmessage/llinstantmessage.h53
-rw-r--r--indra/llmessage/llxfer.cpp58
-rw-r--r--indra/llmessage/llxfer.h7
-rw-r--r--indra/llmessage/llxfer_file.cpp68
-rw-r--r--indra/llmessage/llxfer_file.h6
-rw-r--r--indra/llmessage/llxfer_mem.cpp22
-rw-r--r--indra/llmessage/llxfer_mem.h1
-rw-r--r--indra/llmessage/llxfer_vfile.cpp92
-rw-r--r--indra/llmessage/llxfer_vfile.h8
-rw-r--r--indra/llmessage/llxfermanager.cpp735
-rw-r--r--indra/llmessage/llxfermanager.h50
13 files changed, 613 insertions, 711 deletions
diff --git a/indra/llmessage/llavatarnamecache.cpp b/indra/llmessage/llavatarnamecache.cpp
index abe0e46e5d..ba1a2a035e 100644
--- a/indra/llmessage/llavatarnamecache.cpp
+++ b/indra/llmessage/llavatarnamecache.cpp
@@ -192,13 +192,21 @@ void LLAvatarNameCache::requestAvatarNameCache_(std::string url, std::vector<LLU
LL_DEBUGS("AvNameCache") << "Entering coroutine " << LLCoros::instance().getName()
<< " with url '" << url << "', requesting " << agentIds.size() << " Agent Ids" << LL_ENDL;
+ // Check pointer that can be cleaned up by cleanupClass()
+ if (!sHttpRequest || !sHttpOptions || !sHttpHeaders)
+ {
+ LL_WARNS("AvNameCache") << " Trying to request name cache when http pointers are not initialized." << LL_ENDL;
+ return;
+ }
+
+ LLSD httpResults;
+
try
{
bool success = true;
LLCoreHttpUtil::HttpCoroutineAdapter httpAdapter("NameCache", LLAvatarNameCache::sHttpPolicy);
LLSD results = httpAdapter.getAndSuspend(sHttpRequest, url);
- LLSD httpResults;
LL_DEBUGS() << results << LL_ENDL;
@@ -237,6 +245,7 @@ void LLAvatarNameCache::requestAvatarNameCache_(std::string url, std::vector<LLU
{
LOG_UNHANDLED_EXCEPTION(STRINGIZE("coroutine " << LLCoros::instance().getName()
<< "('" << url << "', " << agentIds.size()
+ << " http result: " << httpResults.asString()
<< " Agent Ids)"));
throw;
}
@@ -326,6 +335,11 @@ void LLAvatarNameCache::handleAgentError(const LLUUID& agent_id)
void LLAvatarNameCache::processName(const LLUUID& agent_id, const LLAvatarName& av_name)
{
+ if (agent_id.isNull())
+ {
+ return;
+ }
+
// Add to the cache
sCache[agent_id] = av_name;
diff --git a/indra/llmessage/llinstantmessage.cpp b/indra/llmessage/llinstantmessage.cpp
index b7f3e6e4f7..dd5a655d7e 100644
--- a/indra/llmessage/llinstantmessage.cpp
+++ b/indra/llmessage/llinstantmessage.cpp
@@ -51,98 +51,6 @@ const std::string INTERACTIVE_SYSTEM_FROM("F387446C-37C4-45f2-A438-D99CBDBB563B"
const S32 IM_TTL = 1;
-/**
- * LLIMInfo
- */
-LLIMInfo::LLIMInfo() :
- mFromGroup(FALSE),
- mParentEstateID(0),
- mOffline(0),
- mViewerThinksToIsOnline(false),
- mIMType(IM_NOTHING_SPECIAL),
- mTimeStamp(0),
- mTTL(IM_TTL)
-{
-}
-
-LLIMInfo::LLIMInfo(
- const LLUUID& from_id,
- BOOL from_group,
- const LLUUID& to_id,
- EInstantMessage im_type,
- const std::string& name,
- const std::string& message,
- const LLUUID& id,
- U32 parent_estate_id,
- const LLUUID& region_id,
- const LLVector3& position,
- LLSD data,
- U8 offline,
- U32 timestamp,
- S32 ttl) :
- mFromID(from_id),
- mFromGroup(from_group),
- mToID(to_id),
- mParentEstateID(0),
- mRegionID(region_id),
- mPosition(position),
- mOffline(offline),
- mViewerThinksToIsOnline(false),
- mIMType(im_type),
- mID(id),
- mTimeStamp(timestamp),
- mName(name),
- mMessage(message),
- mData(data),
- mTTL(ttl)
-{
-}
-
-LLIMInfo::LLIMInfo(LLMessageSystem* msg, S32 ttl) :
- mViewerThinksToIsOnline(false),
- mTTL(ttl)
-{
- unpackMessageBlock(msg);
-}
-
-LLIMInfo::~LLIMInfo()
-{
-}
-
-void LLIMInfo::packInstantMessage(LLMessageSystem* msg) const
-{
- LL_DEBUGS() << "LLIMInfo::packInstantMessage()" << LL_ENDL;
- msg->newMessageFast(_PREHASH_ImprovedInstantMessage);
- packMessageBlock(msg);
-}
-
-void LLIMInfo::packMessageBlock(LLMessageSystem* msg) const
-{
- // Construct binary bucket
- std::vector<U8> bucket;
- if (mData.has("binary_bucket"))
- {
- bucket = mData["binary_bucket"].asBinary();
- }
- pack_instant_message_block(
- msg,
- mFromID,
- mFromGroup,
- LLUUID::null,
- mToID,
- mName,
- mMessage,
- mOffline,
- mIMType,
- mID,
- mParentEstateID,
- mRegionID,
- mPosition,
- mTimeStamp,
- &bucket[0],
- bucket.size());
-}
-
void pack_instant_message(
LLMessageSystem* msg,
const LLUUID& from_id,
@@ -253,120 +161,4 @@ void pack_instant_message_block(
msg->addBinaryDataFast(_PREHASH_BinaryBucket, bb, binary_bucket_size);
}
-void LLIMInfo::unpackMessageBlock(LLMessageSystem* msg)
-{
- msg->getUUIDFast(_PREHASH_AgentData, _PREHASH_AgentID, mFromID);
- msg->getBOOLFast(_PREHASH_MessageBlock, _PREHASH_FromGroup, mFromGroup);
- msg->getUUIDFast(_PREHASH_MessageBlock, _PREHASH_ToAgentID, mToID);
- msg->getU32Fast(_PREHASH_MessageBlock, _PREHASH_ParentEstateID, mParentEstateID);
- msg->getUUIDFast(_PREHASH_MessageBlock, _PREHASH_RegionID, mRegionID);
- msg->getVector3Fast(_PREHASH_MessageBlock, _PREHASH_Position, mPosition);
- msg->getU8Fast(_PREHASH_MessageBlock, _PREHASH_Offline, mOffline);
- U8 dialog;
- msg->getU8Fast(_PREHASH_MessageBlock, _PREHASH_Dialog, dialog);
- mIMType = (EInstantMessage) dialog;
- msg->getUUIDFast(_PREHASH_MessageBlock, _PREHASH_ID, mID);
- msg->getU32Fast(_PREHASH_MessageBlock, _PREHASH_Timestamp, mTimeStamp);
- msg->getStringFast(_PREHASH_MessageBlock, _PREHASH_FromAgentName, mName);
-
- msg->getStringFast(_PREHASH_MessageBlock, _PREHASH_Message, mMessage);
-
- S32 binary_bucket_size = llmin(
- MTUBYTES,
- msg->getSizeFast(
- _PREHASH_MessageBlock,
- _PREHASH_BinaryBucket));
- if(binary_bucket_size > 0)
- {
- std::vector<U8> bucket;
- bucket.resize(binary_bucket_size);
-
- msg->getBinaryDataFast(
- _PREHASH_MessageBlock,
- _PREHASH_BinaryBucket,
- &bucket[0],
- 0,
- 0,
- binary_bucket_size);
- mData["binary_bucket"] = bucket;
- }
- else
- {
- mData.clear();
- }
-}
-
-LLSD im_info_to_llsd(LLPointer<LLIMInfo> im_info)
-{
- LLSD param_version;
- param_version["version"] = 1;
- LLSD param_message;
- param_message["from_id"] = im_info->mFromID;
- param_message["from_group"] = im_info->mFromGroup;
- param_message["to_id"] = im_info->mToID;
- param_message["from_name"] = im_info->mName;
- param_message["message"] = im_info->mMessage;
- param_message["type"] = (S32)im_info->mIMType;
- param_message["id"] = im_info->mID;
- param_message["timestamp"] = (S32)im_info->mTimeStamp;
- param_message["offline"] = (S32)im_info->mOffline;
- param_message["parent_estate_id"] = (S32)im_info->mParentEstateID;
- param_message["region_id"] = im_info->mRegionID;
- param_message["position"] = ll_sd_from_vector3(im_info->mPosition);
- param_message["data"] = im_info->mData;
- param_message["ttl"] = im_info->mTTL;
-
- LLSD param_agent;
- param_agent["agent_id"] = im_info->mFromID;
-
- LLSD params;
- params["version_params"] = param_version;
- params["message_params"] = param_message;
- params["agent_params"] = param_agent;
-
- return params;
-}
-
-LLPointer<LLIMInfo> llsd_to_im_info(const LLSD& im_info_sd)
-{
- LLSD param_message = im_info_sd["message_params"];
- LLSD param_agent = im_info_sd["agent_params"];
-
- LLPointer<LLIMInfo> im_info = new LLIMInfo(
- param_message["from_id"].asUUID(),
- param_message["from_group"].asBoolean(),
- param_message["to_id"].asUUID(),
- (EInstantMessage) param_message["type"].asInteger(),
- param_message["from_name"].asString(),
- param_message["message"].asString(),
- param_message["id"].asUUID(),
- (U32) param_message["parent_estate_id"].asInteger(),
- param_message["region_id"].asUUID(),
- ll_vector3_from_sd(param_message["position"]),
- param_message["data"],
- (U8) param_message["offline"].asInteger(),
- (U32) param_message["timestamp"].asInteger(),
- param_message["ttl"].asInteger());
-
- return im_info;
-}
-
-LLPointer<LLIMInfo> LLIMInfo::clone()
-{
- return new LLIMInfo(
- mFromID,
- mFromGroup,
- mToID,
- mIMType,
- mName,
- mMessage,
- mID,
- mParentEstateID,
- mRegionID,
- mPosition,
- mData,
- mOffline,
- mTimeStamp,
- mTTL);
-}
diff --git a/indra/llmessage/llinstantmessage.h b/indra/llmessage/llinstantmessage.h
index f7118f8ccf..55cda15405 100644
--- a/indra/llmessage/llinstantmessage.h
+++ b/indra/llmessage/llinstantmessage.h
@@ -177,59 +177,6 @@ extern const std::string INTERACTIVE_SYSTEM_FROM;
// Number of retry attempts on sending the im.
extern const S32 IM_TTL;
-
-class LLIMInfo : public LLRefCount
-{
-protected:
- LLIMInfo();
- ~LLIMInfo();
-
-public:
- LLIMInfo(LLMessageSystem* msg,
- S32 ttl = IM_TTL);
-
- LLIMInfo(
- const LLUUID& from_id,
- BOOL from_group,
- const LLUUID& to_id,
- EInstantMessage im_type,
- const std::string& name,
- const std::string& message,
- const LLUUID& id,
- U32 parent_estate_id,
- const LLUUID& region_id,
- const LLVector3& position,
- LLSD data,
- U8 offline,
- U32 timestamp,
- S32 ttl = IM_TTL);
-
- void packInstantMessage(LLMessageSystem* msg) const;
- void packMessageBlock(LLMessageSystem* msg) const;
- void unpackMessageBlock(LLMessageSystem* msg);
- LLPointer<LLIMInfo> clone();
-public:
- LLUUID mFromID;
- BOOL mFromGroup;
- LLUUID mToID;
- U32 mParentEstateID;
- LLUUID mRegionID;
- LLVector3 mPosition;
- U8 mOffline;
- bool mViewerThinksToIsOnline;
- EInstantMessage mIMType;
- LLUUID mID;
- U32 mTimeStamp;
- std::string mName;
- std::string mMessage;
- LLSD mData;
-
- S32 mTTL;
-};
-
-LLPointer<LLIMInfo> llsd_to_im_info(const LLSD& im_info_sd);
-LLSD im_info_to_llsd(LLPointer<LLIMInfo> im_info);
-
void pack_instant_message(
LLMessageSystem* msgsystem,
const LLUUID& from_id,
diff --git a/indra/llmessage/llxfer.cpp b/indra/llmessage/llxfer.cpp
index e0590dfdff..c8b9d5d19f 100644
--- a/indra/llmessage/llxfer.cpp
+++ b/indra/llmessage/llxfer.cpp
@@ -63,7 +63,6 @@ void LLXfer::init (S32 chunk_size)
mXferSize = 0;
mStatus = e_LL_XFER_UNINITIALIZED;
- mNext = NULL;
mWaitingForACK = FALSE;
mCallback = NULL;
@@ -99,7 +98,22 @@ void LLXfer::cleanup ()
S32 LLXfer::startSend (U64 xfer_id, const LLHost &remote_host)
{
- LL_WARNS() << "undifferentiated LLXfer::startSend for " << getFileName() << LL_ENDL;
+ LL_WARNS("Xfer") << "unexpected call to base class LLXfer::startSend for " << getFileName() << LL_ENDL;
+ return (-1);
+}
+
+///////////////////////////////////////////////////////////
+
+void LLXfer::closeFileHandle()
+{
+ LL_WARNS("Xfer") << "unexpected call to base class LLXfer::closeFileHandle for " << getFileName() << LL_ENDL;
+}
+
+///////////////////////////////////////////////////////////
+
+S32 LLXfer::reopenFileHandle()
+{
+ LL_WARNS("Xfer") << "unexpected call to base class LLXfer::reopenFileHandle for " << getFileName() << LL_ENDL;
return (-1);
}
@@ -115,7 +129,7 @@ void LLXfer::setXferSize (S32 xfer_size)
S32 LLXfer::startDownload()
{
- LL_WARNS() << "undifferentiated LLXfer::startDownload for " << getFileName()
+ LL_WARNS("Xfer") << "undifferentiated LLXfer::startDownload for " << getFileName()
<< LL_ENDL;
return (-1);
}
@@ -127,20 +141,20 @@ S32 LLXfer::receiveData (char *datap, S32 data_size)
S32 retval = 0;
if (((S32) mBufferLength + data_size) > getMaxBufferSize())
- {
+ { // Write existing data to disk if it's larger than the buffer size
retval = flush();
}
if (!retval)
{
if (datap != NULL)
- {
+ { // Append new data to mBuffer
memcpy(&mBuffer[mBufferLength],datap,data_size); /*Flawfinder: ignore*/
mBufferLength += data_size;
}
else
{
- LL_ERRS() << "NULL data passed in receiveData" << LL_ENDL;
+ LL_ERRS("Xfer") << "NULL data passed in receiveData" << LL_ENDL;
}
}
@@ -163,7 +177,7 @@ S32 LLXfer::flush()
S32 LLXfer::suck(S32 start_position)
{
- LL_WARNS() << "Attempted to send a packet outside the buffer bounds in LLXfer::suck()" << LL_ENDL;
+ LL_WARNS("Xfer") << "Attempted to send a packet outside the buffer bounds in LLXfer::suck()" << LL_ENDL;
return (-1);
}
@@ -196,7 +210,7 @@ void LLXfer::sendPacket(S32 packet_num)
if (fdata_size < 0)
{
- LL_WARNS() << "negative data size in xfer send, aborting" << LL_ENDL;
+ LL_WARNS("Xfer") << "negative data size in xfer send, aborting" << LL_ENDL;
abort(LL_ERR_EOF);
return;
}
@@ -248,7 +262,12 @@ void LLXfer::sendPacket(S32 packet_num)
gMessageSystem->nextBlockFast(_PREHASH_DataPacket);
gMessageSystem->addBinaryDataFast(_PREHASH_Data, &fdata_buf,fdata_size);
- gMessageSystem->sendMessage(mRemoteHost);
+ S32 sent_something = gMessageSystem->sendMessage(mRemoteHost);
+ if (sent_something == 0)
+ {
+ abort(LL_ERR_CIRCUIT_GONE);
+ return;
+ }
ACKTimer.reset();
mWaitingForACK = TRUE;
@@ -289,12 +308,12 @@ S32 LLXfer::processEOF()
if (LL_ERR_NOERR == mCallbackResult)
{
- LL_INFOS() << "xfer from " << mRemoteHost << " complete: " << getFileName()
+ LL_INFOS("Xfer") << "xfer from " << mRemoteHost << " complete: " << getFileName()
<< LL_ENDL;
}
else
{
- LL_INFOS() << "xfer from " << mRemoteHost << " failed or aborted, code "
+ LL_INFOS("Xfer") << "xfer from " << mRemoteHost << " failed, code "
<< mCallbackResult << ": " << getFileName() << LL_ENDL;
}
@@ -323,15 +342,18 @@ void LLXfer::abort (S32 result_code)
{
mCallbackResult = result_code;
- LL_INFOS() << "Aborting xfer from " << mRemoteHost << " named " << getFileName()
+ LL_INFOS("Xfer") << "Aborting xfer from " << mRemoteHost << " named " << getFileName()
<< " - error: " << result_code << LL_ENDL;
- gMessageSystem->newMessageFast(_PREHASH_AbortXfer);
- gMessageSystem->nextBlockFast(_PREHASH_XferID);
- gMessageSystem->addU64Fast(_PREHASH_ID, mID);
- gMessageSystem->addS32Fast(_PREHASH_Result, result_code);
-
- gMessageSystem->sendMessage(mRemoteHost);
+ if (result_code != LL_ERR_CIRCUIT_GONE)
+ {
+ gMessageSystem->newMessageFast(_PREHASH_AbortXfer);
+ gMessageSystem->nextBlockFast(_PREHASH_XferID);
+ gMessageSystem->addU64Fast(_PREHASH_ID, mID);
+ gMessageSystem->addS32Fast(_PREHASH_Result, result_code);
+
+ gMessageSystem->sendMessage(mRemoteHost);
+ }
mStatus = e_LL_XFER_ABORTED;
}
diff --git a/indra/llmessage/llxfer.h b/indra/llmessage/llxfer.h
index edf5eeb82d..a906674dec 100644
--- a/indra/llmessage/llxfer.h
+++ b/indra/llmessage/llxfer.h
@@ -54,7 +54,6 @@ class LLXfer
S32 mChunkSize;
public:
- LLXfer *mNext;
U64 mID;
S32 mPacketNum;
@@ -62,7 +61,7 @@ class LLXfer
S32 mXferSize;
char *mBuffer;
- U32 mBufferLength;
+ U32 mBufferLength; // Size of valid data, not actual allocated buffer size
U32 mBufferStartOffset;
BOOL mBufferContainsEOF;
@@ -90,7 +89,9 @@ class LLXfer
void init(S32 chunk_size);
virtual void cleanup();
- virtual S32 startSend (U64 xfer_id, const LLHost &remote_host);
+ virtual S32 startSend(U64 xfer_id, const LLHost &remote_host);
+ virtual void closeFileHandle();
+ virtual S32 reopenFileHandle();
virtual void sendPacket(S32 packet_num);
virtual void sendNextPacket();
virtual void resendLastPacket();
diff --git a/indra/llmessage/llxfer_file.cpp b/indra/llmessage/llxfer_file.cpp
index 8e2ed890e7..7fd4222fb7 100644
--- a/indra/llmessage/llxfer_file.cpp
+++ b/indra/llmessage/llxfer_file.cpp
@@ -102,12 +102,12 @@ void LLXfer_File::cleanup ()
if (mDeleteLocalOnCompletion)
{
- LL_DEBUGS() << "Removing file: " << mLocalFilename << LL_ENDL;
+ LL_DEBUGS("Xfer") << "Removing file: " << mLocalFilename << LL_ENDL;
LLFile::remove(mLocalFilename, ENOENT);
}
else
{
- LL_DEBUGS() << "Keeping local file: " << mLocalFilename << LL_ENDL;
+ LL_DEBUGS("Xfer") << "Keeping local file: " << mLocalFilename << LL_ENDL;
}
LLXfer::cleanup();
@@ -139,7 +139,7 @@ S32 LLXfer_File::initializeRequest(U64 xfer_id,
mCallbackDataHandle = user_data;
mCallbackResult = LL_ERR_NOERR;
- LL_INFOS() << "Requesting xfer from " << remote_host << " for file: " << mLocalFilename << LL_ENDL;
+ LL_INFOS("Xfer") << "Requesting xfer from " << remote_host << " for file: " << mLocalFilename << LL_ENDL;
if (mBuffer)
{
@@ -167,6 +167,7 @@ S32 LLXfer_File::startDownload()
fclose(mFp);
mFp = NULL;
+ // tbd - is it premature to send this message if the queue is backed up?
gMessageSystem->newMessageFast(_PREHASH_RequestXfer);
gMessageSystem->nextBlockFast(_PREHASH_XferID);
gMessageSystem->addU64Fast(_PREHASH_ID, mID);
@@ -182,7 +183,7 @@ S32 LLXfer_File::startDownload()
}
else
{
- LL_WARNS() << "Couldn't create file to be received!" << LL_ENDL;
+ LL_WARNS("Xfer") << "Couldn't create file to be received!" << LL_ENDL;
retval = -1;
}
@@ -206,7 +207,8 @@ S32 LLXfer_File::startSend (U64 xfer_id, const LLHost &remote_host)
mBufferLength = 0;
mBufferStartOffset = 0;
-
+
+ // We leave the file open, assuming we'll start reading and sending soon
mFp = LLFile::fopen(mLocalFilename,"rb"); /* Flawfinder : ignore */
if (mFp)
{
@@ -223,7 +225,7 @@ S32 LLXfer_File::startSend (U64 xfer_id, const LLHost &remote_host)
}
else
{
- LL_INFOS() << "Warning: " << mLocalFilename << " not found." << LL_ENDL;
+ LL_INFOS("Xfer") << "Warning: " << mLocalFilename << " not found." << LL_ENDL;
return (LL_ERR_FILE_NOT_FOUND);
}
@@ -233,6 +235,36 @@ S32 LLXfer_File::startSend (U64 xfer_id, const LLHost &remote_host)
}
///////////////////////////////////////////////////////////
+void LLXfer_File::closeFileHandle()
+{
+ if (mFp)
+ {
+ fclose(mFp);
+ mFp = NULL;
+ }
+}
+
+///////////////////////////////////////////////////////////
+
+S32 LLXfer_File::reopenFileHandle()
+{
+ S32 retval = LL_ERR_NOERR; // presume success
+
+ if (mFp == NULL)
+ {
+ mFp = LLFile::fopen(mLocalFilename,"rb"); /* Flawfinder : ignore */
+ if (mFp == NULL)
+ {
+ LL_INFOS("Xfer") << "Warning: " << mLocalFilename << " not found when re-opening file" << LL_ENDL;
+ retval = LL_ERR_FILE_NOT_FOUND;
+ }
+ }
+
+ return retval;
+}
+
+
+///////////////////////////////////////////////////////////
S32 LLXfer_File::getMaxBufferSize ()
{
@@ -279,18 +311,21 @@ S32 LLXfer_File::flush()
{
if (mFp)
{
- LL_ERRS() << "Overwriting open file pointer!" << LL_ENDL;
+ LL_ERRS("Xfer") << "Overwriting open file pointer!" << LL_ENDL;
}
mFp = LLFile::fopen(mTempFilename,"a+b"); /* Flawfinder : ignore */
if (mFp)
{
- if (fwrite(mBuffer,1,mBufferLength,mFp) != mBufferLength)
+ S32 write_size = fwrite(mBuffer,1,mBufferLength,mFp);
+ if (write_size != mBufferLength)
{
- LL_WARNS() << "Short write" << LL_ENDL;
+ LL_WARNS("Xfer") << "Non-matching write size, requested " << mBufferLength
+ << " but wrote " << write_size
+ << LL_ENDL;
}
-// LL_INFOS() << "******* wrote " << mBufferLength << " bytes of file xfer" << LL_ENDL;
+// LL_INFOS("Xfer") << "******* wrote " << mBufferLength << " bytes of file xfer" << LL_ENDL;
fclose(mFp);
mFp = NULL;
@@ -298,7 +333,7 @@ S32 LLXfer_File::flush()
}
else
{
- LL_WARNS() << "LLXfer_File::flush() unable to open " << mTempFilename << " for writing!" << LL_ENDL;
+ LL_WARNS("Xfer") << "LLXfer_File::flush() unable to open " << mTempFilename << " for writing!" << LL_ENDL;
retval = LL_ERR_CANNOT_OPEN_FILE;
}
}
@@ -329,18 +364,18 @@ S32 LLXfer_File::processEOF()
{
#if !LL_WINDOWS
S32 error_number = errno;
- LL_INFOS() << "Rename failure (" << error_number << ") - "
+ LL_INFOS("Xfer") << "Rename failure (" << error_number << ") - "
<< mTempFilename << " to " << mLocalFilename << LL_ENDL;
if(EXDEV == error_number)
{
if(copy_file(mTempFilename, mLocalFilename) == 0)
{
- LL_INFOS() << "Rename across mounts; copying+unlinking the file instead." << LL_ENDL;
+ LL_INFOS("Xfer") << "Rename across mounts; copying+unlinking the file instead." << LL_ENDL;
unlink(mTempFilename.c_str());
}
else
{
- LL_WARNS() << "Copy failure - " << mTempFilename << " to "
+ LL_WARNS("Xfer") << "Copy failure - " << mTempFilename << " to "
<< mLocalFilename << LL_ENDL;
}
}
@@ -354,11 +389,11 @@ S32 LLXfer_File::processEOF()
//LL_WARNS() << "File " << mLocalFilename << " does "
// << (!fp ? "not" : "" ) << " exit." << LL_ENDL;
//if(fp) fclose(fp);
- LL_WARNS() << "Rename fatally failed, can only handle EXDEV ("
+ LL_WARNS("Xfer") << "Rename fatally failed, can only handle EXDEV ("
<< EXDEV << ")" << LL_ENDL;
}
#else
- LL_WARNS() << "Rename failure - " << mTempFilename << " to "
+ LL_WARNS("Xfer") << "Rename failure - " << mTempFilename << " to "
<< mLocalFilename << LL_ENDL;
#endif
}
@@ -437,3 +472,4 @@ S32 copy_file(const std::string& from, const std::string& to)
return rv;
}
#endif
+
diff --git a/indra/llmessage/llxfer_file.h b/indra/llmessage/llxfer_file.h
index a37dda6732..ab9374549e 100644
--- a/indra/llmessage/llxfer_file.h
+++ b/indra/llmessage/llxfer_file.h
@@ -61,8 +61,10 @@ class LLXfer_File : public LLXfer
virtual S32 startDownload();
virtual S32 processEOF();
-
- virtual S32 startSend (U64 xfer_id, const LLHost &remote_host);
+
+ virtual S32 startSend(U64 xfer_id, const LLHost &remote_host);
+ virtual void closeFileHandle();
+ virtual S32 reopenFileHandle();
virtual S32 suck(S32 start_position);
virtual S32 flush();
diff --git a/indra/llmessage/llxfer_mem.cpp b/indra/llmessage/llxfer_mem.cpp
index 3bea08f2e5..78a3e4f558 100644
--- a/indra/llmessage/llxfer_mem.cpp
+++ b/indra/llmessage/llxfer_mem.cpp
@@ -80,28 +80,6 @@ void LLXfer_Mem::setXferSize (S32 xfer_size)
///////////////////////////////////////////////////////////
-U64 LLXfer_Mem::registerXfer(U64 xfer_id, const void *datap, const S32 length)
-{
- mID = xfer_id;
-
- if (datap)
- {
- setXferSize(length);
- if (mBuffer)
- {
- memcpy(mBuffer,datap,length); /* Flawfinder : ignore */
- mBufferLength = length;
- }
- else
- {
- xfer_id = 0;
- }
- }
-
- mStatus = e_LL_XFER_REGISTERED;
- return (xfer_id);
-}
-
S32 LLXfer_Mem::startSend (U64 xfer_id, const LLHost &remote_host)
{
S32 retval = LL_ERR_NOERR; // presume success
diff --git a/indra/llmessage/llxfer_mem.h b/indra/llmessage/llxfer_mem.h
index b5adf837df..d07779de87 100644
--- a/indra/llmessage/llxfer_mem.h
+++ b/indra/llmessage/llxfer_mem.h
@@ -53,7 +53,6 @@ class LLXfer_Mem : public LLXfer
virtual void cleanup();
virtual S32 startSend (U64 xfer_id, const LLHost &remote_host);
- virtual U64 registerXfer(U64 xfer_id, const void *datap, const S32 length);
virtual void setXferSize (S32 data_size);
virtual S32 initializeRequest(U64 xfer_id,
diff --git a/indra/llmessage/llxfer_vfile.cpp b/indra/llmessage/llxfer_vfile.cpp
index 4a378d1d34..ddc24342f6 100644
--- a/indra/llmessage/llxfer_vfile.cpp
+++ b/indra/llmessage/llxfer_vfile.cpp
@@ -79,8 +79,20 @@ void LLXfer_VFile::init (LLVFS *vfs, const LLUUID &local_id, LLAssetType::EType
void LLXfer_VFile::cleanup ()
{
- LLVFile file(mVFS, mTempID, mType, LLVFile::WRITE);
- file.remove();
+ if (mTempID.notNull() &&
+ mDeleteTempFile)
+ {
+ if (mVFS->getExists(mTempID, mType))
+ {
+ LLVFile file(mVFS, mTempID, mType, LLVFile::WRITE);
+ file.remove();
+ }
+ else
+ {
+ LL_WARNS("Xfer") << "LLXfer_VFile::cleanup() can't open to delete VFS file " << mTempID << "." << LLAssetType::lookup(mType)
+ << ", mRemoteID is " << mRemoteID << LL_ENDL;
+ }
+ }
delete mVFile;
mVFile = NULL;
@@ -118,7 +130,7 @@ S32 LLXfer_VFile::initializeRequest(U64 xfer_id,
mName = llformat("VFile %s:%s", id_string.c_str(), LLAssetType::lookup(mType));
- LL_INFOS() << "Requesting " << mName << LL_ENDL;
+ LL_INFOS("Xfer") << "Requesting " << mName << LL_ENDL;
if (mBuffer)
{
@@ -131,6 +143,7 @@ S32 LLXfer_VFile::initializeRequest(U64 xfer_id,
mBufferLength = 0;
mPacketNum = 0;
mTempID.generate();
+ mDeleteTempFile = TRUE;
mStatus = e_LL_XFER_PENDING;
return retval;
}
@@ -140,7 +153,8 @@ S32 LLXfer_VFile::initializeRequest(U64 xfer_id,
S32 LLXfer_VFile::startDownload()
{
S32 retval = 0; // presume success
- LLVFile file(mVFS, mTempID, mType, LLVFile::APPEND);
+
+ // Don't need to create the file here, it will happen when data arrives
gMessageSystem->newMessageFast(_PREHASH_RequestXfer);
gMessageSystem->nextBlockFast(_PREHASH_XferID);
@@ -184,6 +198,8 @@ S32 LLXfer_VFile::startSend (U64 xfer_id, const LLHost &remote_host)
if (mVFile->getSize() <= 0)
{
+ LL_WARNS("Xfer") << "LLXfer_VFile::startSend() VFS file " << mLocalID << "." << LLAssetType::lookup(mType)
+ << " has unexpected file size of " << mVFile->getSize() << LL_ENDL;
delete mVFile;
mVFile = NULL;
@@ -198,6 +214,7 @@ S32 LLXfer_VFile::startSend (U64 xfer_id, const LLHost &remote_host)
}
else
{
+ LL_WARNS("Xfer") << "LLXfer_VFile::startSend() can't read VFS file " << mLocalID << "." << LLAssetType::lookup(mType) << LL_ENDL;
retval = LL_ERR_FILE_NOT_FOUND;
}
@@ -205,6 +222,41 @@ S32 LLXfer_VFile::startSend (U64 xfer_id, const LLHost &remote_host)
}
///////////////////////////////////////////////////////////
+
+void LLXfer_VFile::closeFileHandle()
+{
+ if (mVFile)
+ {
+ delete mVFile;
+ mVFile = NULL;
+ }
+}
+
+///////////////////////////////////////////////////////////
+
+S32 LLXfer_VFile::reopenFileHandle()
+{
+ S32 retval = LL_ERR_NOERR; // presume success
+
+ if (mVFile == NULL)
+ {
+ if (mVFS->getExists(mLocalID, mType))
+ {
+ mVFile = new LLVFile(mVFS, mLocalID, mType, LLVFile::READ);
+ }
+ else
+ {
+ LL_WARNS("Xfer") << "LLXfer_VFile::reopenFileHandle() can't read VFS file " << mLocalID << "." << LLAssetType::lookup(mType) << LL_ENDL;
+ retval = LL_ERR_FILE_NOT_FOUND;
+ }
+ }
+
+ return retval;
+}
+
+
+///////////////////////////////////////////////////////////
+
void LLXfer_VFile::setXferSize (S32 xfer_size)
{
LLXfer::setXferSize(xfer_size);
@@ -236,8 +288,8 @@ S32 LLXfer_VFile::suck(S32 start_position)
// grab a buffer from the right place in the file
if (! mVFile->seek(start_position, 0))
{
- LL_WARNS() << "VFile Xfer Can't seek to position " << start_position << ", file length " << mVFile->getSize() << LL_ENDL;
- LL_WARNS() << "While sending file " << mLocalID << LL_ENDL;
+ LL_WARNS("Xfer") << "VFile Xfer Can't seek to position " << start_position << ", file length " << mVFile->getSize() << LL_ENDL;
+ LL_WARNS("Xfer") << "While sending file " << mLocalID << LL_ENDL;
return -1;
}
@@ -288,12 +340,31 @@ S32 LLXfer_VFile::processEOF()
if (!mCallbackResult)
{
- LLVFile file(mVFS, mTempID, mType, LLVFile::WRITE);
- if (! file.rename(mLocalID, mType))
+ if (mVFS->getExists(mTempID, mType))
{
- LL_INFOS() << "copy from temp file failed: unable to rename to " << mLocalID << LL_ENDL;
+ LLVFile file(mVFS, mTempID, mType, LLVFile::WRITE);
+ if (!file.rename(mLocalID, mType))
+ {
+ LL_WARNS("Xfer") << "VFS rename of temp file failed: unable to rename " << mTempID << " to " << mLocalID << LL_ENDL;
+ }
+ else
+ {
+ #ifdef VFS_SPAM
+ // Debugging spam
+ LL_INFOS("Xfer") << "VFS rename of temp file done: renamed " << mTempID << " to " << mLocalID
+ << " LLVFile size is " << file.getSize()
+ << LL_ENDL;
+ #endif
+
+ // Rename worked: the original file is gone. Clear mDeleteTempFile
+ // so we don't attempt to delete the file in cleanup()
+ mDeleteTempFile = FALSE;
+ }
+ }
+ else
+ {
+ LL_WARNS("Xfer") << "LLXfer_VFile::processEOF() can't open for renaming VFS file " << mTempID << "." << LLAssetType::lookup(mType) << LL_ENDL;
}
-
}
if (mVFile)
@@ -336,3 +407,4 @@ U32 LLXfer_VFile::getXferTypeTag()
{
return LLXfer::XFER_VFILE;
}
+
diff --git a/indra/llmessage/llxfer_vfile.h b/indra/llmessage/llxfer_vfile.h
index 048bf49dcc..5bf9a5cfba 100644
--- a/indra/llmessage/llxfer_vfile.h
+++ b/indra/llmessage/llxfer_vfile.h
@@ -47,6 +47,8 @@ class LLXfer_VFile : public LLXfer
std::string mName;
+ BOOL mDeleteTempFile;
+
public:
LLXfer_VFile ();
LLXfer_VFile (LLVFS *vfs, const LLUUID &local_id, LLAssetType::EType type);
@@ -66,8 +68,10 @@ class LLXfer_VFile : public LLXfer
virtual S32 startDownload();
virtual S32 processEOF();
-
- virtual S32 startSend (U64 xfer_id, const LLHost &remote_host);
+
+ virtual S32 startSend(U64 xfer_id, const LLHost &remote_host);
+ virtual void closeFileHandle();
+ virtual S32 reopenFileHandle();
virtual S32 suck(S32 start_position);
virtual S32 flush();
diff --git a/indra/llmessage/llxfermanager.cpp b/indra/llmessage/llxfermanager.cpp
index 2ceb64ce8f..4cea886c8a 100644
--- a/indra/llmessage/llxfermanager.cpp
+++ b/indra/llmessage/llxfermanager.cpp
@@ -44,9 +44,15 @@ const S32 LL_PACKET_RETRY_LIMIT = 10; // packet retransmission limit
const S32 LL_DEFAULT_MAX_SIMULTANEOUS_XFERS = 10;
const S32 LL_DEFAULT_MAX_REQUEST_FIFO_XFERS = 1000;
-#define LL_XFER_PROGRESS_MESSAGES 0
-#define LL_XFER_TEST_REXMIT 0
+// Kills the connection if a viewer download queue hits this many requests backed up
+// Also set in simulator.xml at "hard_limit_outgoing_xfers_per_circuit"
+const S32 LL_DEFAULT_MAX_HARD_LIMIT_SIMULTANEOUS_XFERS = 500;
+// Use this to show sending some ConfirmXferPacket messages
+//#define LL_XFER_PROGRESS_MESSAGES 1
+
+// Use this for lots of diagnostic spam
+//#define LL_XFER_DIAGNOISTIC_LOGGING 1
///////////////////////////////////////////////////////////
@@ -66,10 +72,10 @@ LLXferManager::~LLXferManager ()
void LLXferManager::init (LLVFS *vfs)
{
- mSendList = NULL;
- mReceiveList = NULL;
+ cleanup();
setMaxOutgoingXfersPerCircuit(LL_DEFAULT_MAX_SIMULTANEOUS_XFERS);
+ setHardLimitOutgoingXfersPerCircuit(LL_DEFAULT_MAX_HARD_LIMIT_SIMULTANEOUS_XFERS);
setMaxIncomingXfers(LL_DEFAULT_MAX_REQUEST_FIFO_XFERS);
mVFS = vfs;
@@ -83,29 +89,14 @@ void LLXferManager::init (LLVFS *vfs)
void LLXferManager::cleanup ()
{
- LLXfer *xferp;
- LLXfer *delp;
-
for_each(mOutgoingHosts.begin(), mOutgoingHosts.end(), DeletePointer());
mOutgoingHosts.clear();
- delp = mSendList;
- while (delp)
- {
- xferp = delp->mNext;
- delete delp;
- delp = xferp;
- }
- mSendList = NULL;
+ for_each(mSendList.begin(), mSendList.end(), DeletePointer());
+ mSendList.clear();
- delp = mReceiveList;
- while (delp)
- {
- xferp = delp->mNext;
- delete delp;
- delp = xferp;
- }
- mReceiveList = NULL;
+ for_each(mReceiveList.begin(), mReceiveList.end(), DeletePointer());
+ mReceiveList.clear();
}
///////////////////////////////////////////////////////////
@@ -122,6 +113,11 @@ void LLXferManager::setMaxOutgoingXfersPerCircuit(S32 max_num)
mMaxOutgoingXfersPerCircuit = max_num;
}
+void LLXferManager::setHardLimitOutgoingXfersPerCircuit(S32 max_num)
+{
+ mHardLimitOutgoingXfersPerCircuit = max_num;
+}
+
void LLXferManager::setUseAckThrottling(const BOOL use)
{
mUseAckThrottling = use;
@@ -140,6 +136,11 @@ void LLXferManager::setAckThrottleBPS(const F32 bps)
F32 actual_rate = llmax(min_bps*1.1f, bps);
LL_DEBUGS("AppInit") << "LLXferManager ack throttle min rate: " << min_bps << LL_ENDL;
LL_DEBUGS("AppInit") << "LLXferManager ack throttle actual rate: " << actual_rate << LL_ENDL;
+ #ifdef LL_XFER_DIAGNOISTIC_LOGGING
+ LL_INFOS("Xfer") << "LLXferManager ack throttle min rate: " << min_bps << LL_ENDL;
+ LL_INFOS("Xfer") << "LLXferManager ack throttle actual rate: " << actual_rate << LL_ENDL;
+ #endif // LL_XFER_DIAGNOISTIC_LOGGING
+
mAckThrottle.setRate(actual_rate);
}
@@ -148,45 +149,71 @@ void LLXferManager::setAckThrottleBPS(const F32 bps)
void LLXferManager::updateHostStatus()
{
- LLXfer *xferp;
- LLHostStatus *host_statusp = NULL;
-
+ // Clear the outgoing host list
for_each(mOutgoingHosts.begin(), mOutgoingHosts.end(), DeletePointer());
mOutgoingHosts.clear();
-
- for (xferp = mSendList; xferp; xferp = xferp->mNext)
+
+ // Loop through all outgoing xfers and re-build mOutgoingHosts
+ for (xfer_list_t::iterator send_iter = mSendList.begin();
+ send_iter != mSendList.end(); ++send_iter)
{
+ LLHostStatus *host_statusp = NULL;
for (status_list_t::iterator iter = mOutgoingHosts.begin();
iter != mOutgoingHosts.end(); ++iter)
{
- host_statusp = *iter;
- if (host_statusp->mHost == xferp->mRemoteHost)
- {
+ if ((*iter)->mHost == (*send_iter)->mRemoteHost)
+ { // Already have this host
+ host_statusp = *iter;
break;
}
}
if (!host_statusp)
- {
+ { // Don't have this host, so add it
host_statusp = new LLHostStatus();
if (host_statusp)
{
- host_statusp->mHost = xferp->mRemoteHost;
+ host_statusp->mHost = (*send_iter)->mRemoteHost;
mOutgoingHosts.push_front(host_statusp);
}
}
if (host_statusp)
- {
- if (xferp->mStatus == e_LL_XFER_PENDING)
+ { // Do the accounting
+ if ((*send_iter)->mStatus == e_LL_XFER_PENDING)
{
host_statusp->mNumPending++;
}
- else if (xferp->mStatus == e_LL_XFER_IN_PROGRESS)
+ else if ((*send_iter)->mStatus == e_LL_XFER_IN_PROGRESS)
{
host_statusp->mNumActive++;
}
}
-
}
+
+#ifdef LL_XFER_DIAGNOISTIC_LOGGING
+ for (xfer_list_t::iterator send_iter = mSendList.begin();
+ send_iter != mSendList.end(); ++send_iter)
+ {
+ LLXfer * xferp = *send_iter;
+ LL_INFOS("Xfer") << "xfer to host " << xferp->mRemoteHost
+ << " is " << xferp->mXferSize << " bytes"
+ << ", status " << (S32)(xferp->mStatus)
+ << ", waiting for ACK: " << (S32)(xferp->mWaitingForACK)
+ << " in frame " << (S32) LLFrameTimer::getFrameCount()
+ << LL_ENDL;
+ }
+
+ for (status_list_t::iterator iter = mOutgoingHosts.begin();
+ iter != mOutgoingHosts.end(); ++iter)
+ {
+ LL_INFOS("Xfer") << "LLXfer host " << (*iter)->mHost.getIPandPort()
+ << " has " << (*iter)->mNumActive
+ << " active, " << (*iter)->mNumPending
+ << " pending"
+ << " in frame " << (S32) LLFrameTimer::getFrameCount()
+ << LL_ENDL;
+ }
+#endif // LL_XFER_DIAGNOISTIC_LOGGING
+
}
///////////////////////////////////////////////////////////
@@ -196,27 +223,28 @@ void LLXferManager::printHostStatus()
LLHostStatus *host_statusp = NULL;
if (!mOutgoingHosts.empty())
{
- LL_INFOS() << "Outgoing Xfers:" << LL_ENDL;
+ LL_INFOS("Xfer") << "Outgoing Xfers:" << LL_ENDL;
for (status_list_t::iterator iter = mOutgoingHosts.begin();
iter != mOutgoingHosts.end(); ++iter)
{
host_statusp = *iter;
- LL_INFOS() << " " << host_statusp->mHost << " active: " << host_statusp->mNumActive << " pending: " << host_statusp->mNumPending << LL_ENDL;
+ LL_INFOS("Xfer") << " " << host_statusp->mHost << " active: " << host_statusp->mNumActive << " pending: " << host_statusp->mNumPending << LL_ENDL;
}
}
}
///////////////////////////////////////////////////////////
-LLXfer *LLXferManager::findXfer (U64 id, LLXfer *list_head)
+LLXfer * LLXferManager::findXferByID(U64 id, xfer_list_t & xfer_list)
{
- LLXfer *xferp;
- for (xferp = list_head; xferp; xferp = xferp->mNext)
+ for (xfer_list_t::iterator iter = xfer_list.begin();
+ iter != xfer_list.end();
+ ++iter)
{
- if (xferp->mID == id)
+ if ((*iter)->mID == id)
{
- return(xferp);
+ return(*iter);
}
}
return(NULL);
@@ -225,29 +253,34 @@ LLXfer *LLXferManager::findXfer (U64 id, LLXfer *list_head)
///////////////////////////////////////////////////////////
-void LLXferManager::removeXfer (LLXfer *delp, LLXfer **list_head)
+// WARNING: this invalidates iterators from xfer_list
+void LLXferManager::removeXfer(LLXfer *delp, xfer_list_t & xfer_list)
{
- // This function assumes that delp will only occur in the list
- // zero or one times.
if (delp)
- {
- if (*list_head == delp)
+ {
+ std::string direction = "send";
+ if (&xfer_list == &mReceiveList)
{
- *list_head = delp->mNext;
- delete (delp);
+ direction = "receive";
}
- else
+
+ // This assumes that delp will occur in the list once at most
+ // Find the pointer in the list
+ for (xfer_list_t::iterator iter = xfer_list.begin();
+ iter != xfer_list.end();
+ ++iter)
{
- LLXfer *xferp = *list_head;
- while (xferp->mNext)
+ if ((*iter) == delp)
{
- if (xferp->mNext == delp)
- {
- xferp->mNext = delp->mNext;
- delete (delp);
- break;
- }
- xferp = xferp->mNext;
+ LL_DEBUGS("Xfer") << "Deleting xfer to host " << (*iter)->mRemoteHost
+ << " of " << (*iter)->mXferSize << " bytes"
+ << ", status " << (S32)((*iter)->mStatus)
+ << " from the " << direction << " list"
+ << LL_ENDL;
+
+ xfer_list.erase(iter);
+ delete (delp);
+ break;
}
}
}
@@ -255,35 +288,30 @@ void LLXferManager::removeXfer (LLXfer *delp, LLXfer **list_head)
///////////////////////////////////////////////////////////
-U32 LLXferManager::numActiveListEntries(LLXfer *list_head)
+LLHostStatus * LLXferManager::findHostStatus(const LLHost &host)
{
- U32 num_entries = 0;
+ LLHostStatus *host_statusp = NULL;
- while (list_head)
+ for (status_list_t::iterator iter = mOutgoingHosts.begin();
+ iter != mOutgoingHosts.end(); ++iter)
{
- if (list_head->mStatus == e_LL_XFER_IN_PROGRESS)
+ host_statusp = *iter;
+ if (host_statusp->mHost == host)
{
- num_entries++;
+ return (host_statusp);
}
- list_head = list_head->mNext;
}
- return(num_entries);
+ return 0;
}
///////////////////////////////////////////////////////////
-
+
S32 LLXferManager::numPendingXfers(const LLHost &host)
{
- LLHostStatus *host_statusp = NULL;
-
- for (status_list_t::iterator iter = mOutgoingHosts.begin();
- iter != mOutgoingHosts.end(); ++iter)
+ LLHostStatus *host_statusp = findHostStatus(host);
+ if (host_statusp)
{
- host_statusp = *iter;
- if (host_statusp->mHost == host)
- {
- return (host_statusp->mNumPending);
- }
+ return host_statusp->mNumPending;
}
return 0;
}
@@ -292,16 +320,10 @@ S32 LLXferManager::numPendingXfers(const LLHost &host)
S32 LLXferManager::numActiveXfers(const LLHost &host)
{
- LLHostStatus *host_statusp = NULL;
-
- for (status_list_t::iterator iter = mOutgoingHosts.begin();
- iter != mOutgoingHosts.end(); ++iter)
+ LLHostStatus *host_statusp = findHostStatus(host);
+ if (host_statusp)
{
- host_statusp = *iter;
- if (host_statusp->mHost == host)
- {
- return (host_statusp->mNumActive);
- }
+ return host_statusp->mNumActive;
}
return 0;
}
@@ -372,35 +394,6 @@ BOOL LLXferManager::isLastPacket(S32 packet_num)
///////////////////////////////////////////////////////////
-U64 LLXferManager::registerXfer(const void *datap, const S32 length)
-{
- LLXfer *xferp;
- U64 xfer_id = getNextID();
-
- xferp = (LLXfer *) new LLXfer_Mem();
- if (xferp)
- {
- xferp->mNext = mSendList;
- mSendList = xferp;
-
- xfer_id = ((LLXfer_Mem *)xferp)->registerXfer(xfer_id, datap,length);
-
- if (!xfer_id)
- {
- removeXfer(xferp,&mSendList);
- }
- }
- else
- {
- LL_ERRS() << "Xfer allocation error" << LL_ENDL;
- xfer_id = 0;
- }
-
- return(xfer_id);
-}
-
-///////////////////////////////////////////////////////////
-
U64 LLXferManager::requestFile(const std::string& local_filename,
const std::string& remote_filename,
ELLPath remote_path,
@@ -411,43 +404,46 @@ U64 LLXferManager::requestFile(const std::string& local_filename,
BOOL is_priority,
BOOL use_big_packets)
{
- LLXfer *xferp;
+ LLXfer_File* file_xfer_p = NULL;
- for (xferp = mReceiveList; xferp ; xferp = xferp->mNext)
+ // First check to see if it's already requested
+ for (xfer_list_t::iterator iter = mReceiveList.begin();
+ iter != mReceiveList.end(); ++iter)
{
- if (xferp->getXferTypeTag() == LLXfer::XFER_FILE
- && (((LLXfer_File*)xferp)->matchesLocalFilename(local_filename))
- && (((LLXfer_File*)xferp)->matchesRemoteFilename(remote_filename, remote_path))
- && (remote_host == xferp->mRemoteHost)
- && (callback == xferp->mCallback)
- && (user_data == xferp->mCallbackDataHandle))
-
+ if ((*iter)->getXferTypeTag() == LLXfer::XFER_FILE)
{
- // cout << "requested a xfer already in progress" << endl;
- return xferp->mID;
+ file_xfer_p = (LLXfer_File*)(*iter);
+ if (file_xfer_p->matchesLocalFilename(local_filename)
+ && file_xfer_p->matchesRemoteFilename(remote_filename, remote_path)
+ && (remote_host == file_xfer_p->mRemoteHost)
+ && (callback == file_xfer_p->mCallback)
+ && (user_data == file_xfer_p->mCallbackDataHandle))
+ {
+ // Already have the request (already in progress)
+ return (*iter)->mID;
+ }
}
}
U64 xfer_id = 0;
S32 chunk_size = use_big_packets ? LL_XFER_LARGE_PAYLOAD : -1;
- xferp = (LLXfer *) new LLXfer_File(chunk_size);
- if (xferp)
+ file_xfer_p = new LLXfer_File(chunk_size);
+ if (file_xfer_p)
{
- addToList(xferp, mReceiveList, is_priority);
+ addToList(file_xfer_p, mReceiveList, is_priority);
// Remove any file by the same name that happens to be lying
// around.
// Note: according to AaronB, this is here to deal with locks on files that were
// in transit during a crash,
- if( delete_remote_on_completion
- && (remote_filename.substr(remote_filename.length()-4) == ".tmp")
- && gDirUtilp->fileExists(local_filename))
+ if(delete_remote_on_completion &&
+ (remote_filename.substr(remote_filename.length()-4) == ".tmp"))
{
LLFile::remove(local_filename, ENOENT);
}
xfer_id = getNextID();
- ((LLXfer_File *)xferp)->initializeRequest(
+ file_xfer_p->initializeRequest(
xfer_id,
local_filename,
remote_filename,
@@ -459,39 +455,11 @@ U64 LLXferManager::requestFile(const std::string& local_filename,
}
else
{
- LL_ERRS() << "Xfer allocation error" << LL_ENDL;
+ LL_ERRS("Xfer") << "Xfer allocation error" << LL_ENDL;
}
return xfer_id;
}
-void LLXferManager::requestFile(const std::string& remote_filename,
- ELLPath remote_path,
- const LLHost& remote_host,
- BOOL delete_remote_on_completion,
- void (*callback)(void*,S32,void**,S32,LLExtStat),
- void** user_data,
- BOOL is_priority)
-{
- LLXfer *xferp;
-
- xferp = (LLXfer *) new LLXfer_Mem();
- if (xferp)
- {
- addToList(xferp, mReceiveList, is_priority);
- ((LLXfer_Mem *)xferp)->initializeRequest(getNextID(),
- remote_filename,
- remote_path,
- remote_host,
- delete_remote_on_completion,
- callback, user_data);
- startPendingDownloads();
- }
- else
- {
- LL_ERRS() << "Xfer allocation error" << LL_ENDL;
- }
-}
-
void LLXferManager::requestVFile(const LLUUID& local_id,
const LLUUID& remote_id,
LLAssetType::EType type, LLVFS* vfs,
@@ -500,28 +468,46 @@ void LLXferManager::requestVFile(const LLUUID& local_id,
void** user_data,
BOOL is_priority)
{
- LLXfer *xferp;
-
- for (xferp = mReceiveList; xferp ; xferp = xferp->mNext)
- {
- if (xferp->getXferTypeTag() == LLXfer::XFER_VFILE
- && (((LLXfer_VFile*)xferp)->matchesLocalFile(local_id, type))
- && (((LLXfer_VFile*)xferp)->matchesRemoteFile(remote_id, type))
- && (remote_host == xferp->mRemoteHost)
- && (callback == xferp->mCallback)
- && (user_data == xferp->mCallbackDataHandle))
+ LLXfer_VFile * xfer_p = NULL;
+ for (xfer_list_t::iterator iter = mReceiveList.begin();
+ iter != mReceiveList.end(); ++iter)
+ { // Find any matching existing requests
+ if ((*iter)->getXferTypeTag() == LLXfer::XFER_VFILE)
{
- // cout << "requested a xfer already in progress" << endl;
- return;
+ xfer_p = (LLXfer_VFile*) (*iter);
+ if (xfer_p->matchesLocalFile(local_id, type)
+ && xfer_p->matchesRemoteFile(remote_id, type)
+ && (remote_host == xfer_p->mRemoteHost)
+ && (callback == xfer_p->mCallback)
+ && (user_data == xfer_p->mCallbackDataHandle))
+
+ { // Have match, don't add a duplicate
+ #ifdef LL_XFER_DIAGNOISTIC_LOGGING
+ LL_INFOS("Xfer") << "Dropping duplicate xfer request for " << remote_id
+ << " on " << remote_host.getIPandPort()
+ << " local id " << local_id
+ << LL_ENDL;
+ #endif // LL_XFER_DIAGNOISTIC_LOGGING
+
+ return;
+ }
}
}
- xferp = (LLXfer *) new LLXfer_VFile();
- if (xferp)
+ xfer_p = new LLXfer_VFile();
+ if (xfer_p)
{
- addToList(xferp, mReceiveList, is_priority);
- ((LLXfer_VFile *)xferp)->initializeRequest(getNextID(),
+ #ifdef LL_XFER_DIAGNOISTIC_LOGGING
+ LL_INFOS("Xfer") << "Starting file xfer for " << remote_id
+ << " type " << LLAssetType::lookupHumanReadable(type)
+ << " from " << xfer_p->mRemoteHost.getIPandPort()
+ << ", local id " << local_id
+ << LL_ENDL;
+ #endif // LL_XFER_DIAGNOISTIC_LOGGING
+
+ addToList(xfer_p, mReceiveList, is_priority);
+ ((LLXfer_VFile *)xfer_p)->initializeRequest(getNextID(),
vfs,
local_id,
remote_id,
@@ -533,78 +519,18 @@ void LLXferManager::requestVFile(const LLUUID& local_id,
}
else
{
- LL_ERRS() << "Xfer allocation error" << LL_ENDL;
- }
-
-}
-
-/*
-void LLXferManager::requestXfer(
- const std::string& local_filename,
- BOOL delete_remote_on_completion,
- U64 xfer_id,
- const LLHost &remote_host,
- void (*callback)(void **,S32),
- void **user_data)
-{
- LLXfer *xferp;
-
- for (xferp = mReceiveList; xferp ; xferp = xferp->mNext)
- {
- if (xferp->getXferTypeTag() == LLXfer::XFER_FILE
- && (((LLXfer_File*)xferp)->matchesLocalFilename(local_filename))
- && (xfer_id == xferp->mID)
- && (remote_host == xferp->mRemoteHost)
- && (callback == xferp->mCallback)
- && (user_data == xferp->mCallbackDataHandle))
-
- {
- // cout << "requested a xfer already in progress" << endl;
- return;
- }
+ LL_ERRS("Xfer") << "Xfer allocation error" << LL_ENDL;
}
- xferp = (LLXfer *) new LLXfer_File();
- if (xferp)
- {
- xferp->mNext = mReceiveList;
- mReceiveList = xferp;
-
- ((LLXfer_File *)xferp)->initializeRequest(xfer_id,local_filename,"",LL_PATH_NONE,remote_host,delete_remote_on_completion,callback,user_data);
- startPendingDownloads();
- }
- else
- {
- LL_ERRS() << "Xfer allcoation error" << LL_ENDL;
- }
}
-void LLXferManager::requestXfer(U64 xfer_id, const LLHost &remote_host, BOOL delete_remote_on_completion, void (*callback)(void *,S32,void **,S32),void **user_data)
-{
- LLXfer *xferp;
-
- xferp = (LLXfer *) new LLXfer_Mem();
- if (xferp)
- {
- xferp->mNext = mReceiveList;
- mReceiveList = xferp;
-
- ((LLXfer_Mem *)xferp)->initializeRequest(xfer_id,"",LL_PATH_NONE,remote_host,delete_remote_on_completion,callback,user_data);
- startPendingDownloads();
- }
- else
- {
- LL_ERRS() << "Xfer allcoation error" << LL_ENDL;
- }
-}
-*/
///////////////////////////////////////////////////////////
void LLXferManager::processReceiveData (LLMessageSystem *mesgsys, void ** /*user_data*/)
{
// there's sometimes an extra 4 bytes added to an xfer payload
const S32 BUF_SIZE = LL_XFER_LARGE_PAYLOAD + 4;
- char fdata_buf[LL_XFER_LARGE_PAYLOAD + 4]; /* Flawfinder : ignore */
+ char fdata_buf[BUF_SIZE]; /* Flawfinder : ignore */
S32 fdata_size;
U64 id;
S32 packetnum;
@@ -614,14 +540,24 @@ void LLXferManager::processReceiveData (LLMessageSystem *mesgsys, void ** /*user
mesgsys->getS32Fast(_PREHASH_XferID, _PREHASH_Packet, packetnum);
fdata_size = mesgsys->getSizeFast(_PREHASH_DataPacket,_PREHASH_Data);
- mesgsys->getBinaryDataFast(_PREHASH_DataPacket, _PREHASH_Data, fdata_buf, 0, 0, BUF_SIZE);
-
- xferp = findXfer(id, mReceiveList);
+ if (fdata_size < 0 ||
+ fdata_size > BUF_SIZE)
+ {
+ char U64_BUF[MAX_STRING]; /* Flawfinder : ignore */
+ LL_WARNS("Xfer") << "Received invalid xfer data size of " << fdata_size
+ << " in packet number " << packetnum
+ << " from " << mesgsys->getSender()
+ << " for xfer id: " << U64_to_str(id, U64_BUF, sizeof(U64_BUF))
+ << LL_ENDL;
+ return;
+ }
+ mesgsys->getBinaryDataFast(_PREHASH_DataPacket, _PREHASH_Data, fdata_buf, fdata_size, 0, BUF_SIZE);
- if (!xferp)
+ xferp = findXferByID(id, mReceiveList);
+ if (!xferp)
{
char U64_BUF[MAX_STRING]; /* Flawfinder : ignore */
- LL_INFOS() << "received xfer data from " << mesgsys->getSender()
+ LL_WARNS("Xfer") << "received xfer data from " << mesgsys->getSender()
<< " for non-existent xfer id: "
<< U64_to_str(id, U64_BUF, sizeof(U64_BUF)) << LL_ENDL;
return;
@@ -634,11 +570,11 @@ void LLXferManager::processReceiveData (LLMessageSystem *mesgsys, void ** /*user
// confirm it if it was a resend of the last one, since the confirmation might have gotten dropped
if (decodePacketNum(packetnum) == (xferp->mPacketNum - 1))
{
- LL_INFOS() << "Reconfirming xfer " << xferp->mRemoteHost << ":" << xferp->getFileName() << " packet " << packetnum << LL_ENDL; sendConfirmPacket(mesgsys, id, decodePacketNum(packetnum), mesgsys->getSender());
+ LL_INFOS("Xfer") << "Reconfirming xfer " << xferp->mRemoteHost << ":" << xferp->getFileName() << " packet " << packetnum << LL_ENDL; sendConfirmPacket(mesgsys, id, decodePacketNum(packetnum), mesgsys->getSender());
}
else
{
- LL_INFOS() << "Ignoring xfer " << xferp->mRemoteHost << ":" << xferp->getFileName() << " recv'd packet " << packetnum << "; expecting " << xferp->mPacketNum << LL_ENDL;
+ LL_INFOS("Xfer") << "Ignoring xfer " << xferp->mRemoteHost << ":" << xferp->getFileName() << " recv'd packet " << packetnum << "; expecting " << xferp->mPacketNum << LL_ENDL;
}
return;
}
@@ -663,7 +599,7 @@ void LLXferManager::processReceiveData (LLMessageSystem *mesgsys, void ** /*user
if (result == LL_ERR_CANNOT_OPEN_FILE)
{
xferp->abort(LL_ERR_CANNOT_OPEN_FILE);
- removeXfer(xferp,&mReceiveList);
+ removeXfer(xferp,mReceiveList);
startPendingDownloads();
return;
}
@@ -688,7 +624,7 @@ void LLXferManager::processReceiveData (LLMessageSystem *mesgsys, void ** /*user
if (isLastPacket(packetnum))
{
xferp->processEOF();
- removeXfer(xferp,&mReceiveList);
+ removeXfer(xferp,mReceiveList);
startPendingDownloads();
}
}
@@ -697,7 +633,7 @@ void LLXferManager::processReceiveData (LLMessageSystem *mesgsys, void ** /*user
void LLXferManager::sendConfirmPacket (LLMessageSystem *mesgsys, U64 id, S32 packetnum, const LLHost &remote_host)
{
-#if LL_XFER_PROGRESS_MESSAGES
+#ifdef LL_XFER_PROGRESS_MESSAGES
if (!(packetnum % 50))
{
cout << "confirming xfer packet #" << packetnum << endl;
@@ -708,6 +644,7 @@ void LLXferManager::sendConfirmPacket (LLMessageSystem *mesgsys, U64 id, S32 pac
mesgsys->addU64Fast(_PREHASH_ID, id);
mesgsys->addU32Fast(_PREHASH_Packet, packetnum);
+ // Ignore a circuit failure here, we'll catch it with another message
mesgsys->sendMessage(remote_host);
}
@@ -746,6 +683,28 @@ bool LLXferManager::validateFileForTransfer(const std::string& filename)
return find_and_remove(mExpectedTransfers, filename);
}
+/* Present in fireengine, not used by viewer
+void LLXferManager::expectVFileForRequest(const std::string& filename)
+{
+ mExpectedVFileRequests.insert(filename);
+}
+
+bool LLXferManager::validateVFileForRequest(const std::string& filename)
+{
+ return find_and_remove(mExpectedVFileRequests, filename);
+}
+
+void LLXferManager::expectVFileForTransfer(const std::string& filename)
+{
+ mExpectedVFileTransfers.insert(filename);
+}
+
+bool LLXferManager::validateVFileForTransfer(const std::string& filename)
+{
+ return find_and_remove(mExpectedVFileTransfers, filename);
+}
+*/
+
static bool remove_prefix(std::string& filename, const std::string& prefix)
{
if (std::equal(prefix.begin(), prefix.end(), filename.begin()))
@@ -807,7 +766,7 @@ void LLXferManager::processFileRequest (LLMessageSystem *mesgsys, void ** /*user
mesgsys->getU64Fast(_PREHASH_XferID, _PREHASH_ID, id);
char U64_BUF[MAX_STRING]; /* Flawfinder : ignore */
- LL_INFOS() << "xfer request id: " << U64_to_str(id, U64_BUF, sizeof(U64_BUF))
+ LL_INFOS("Xfer") << "xfer request id: " << U64_to_str(id, U64_BUF, sizeof(U64_BUF))
<< " to " << mesgsys->getSender() << LL_ENDL;
mesgsys->getStringFast(_PREHASH_XferID, _PREHASH_Filename, local_filename);
@@ -825,36 +784,45 @@ void LLXferManager::processFileRequest (LLMessageSystem *mesgsys, void ** /*user
LLXfer *xferp;
if (uuid != LLUUID::null)
- {
+ { // Request for an asset - use a VFS file
if(NULL == LLAssetType::lookup(type))
{
- LL_WARNS() << "Invalid type for xfer request: " << uuid << ":"
+ LL_WARNS("Xfer") << "Invalid type for xfer request: " << uuid << ":"
<< type_s16 << " to " << mesgsys->getSender() << LL_ENDL;
return;
}
- LL_INFOS() << "starting vfile transfer: " << uuid << "," << LLAssetType::lookup(type) << " to " << mesgsys->getSender() << LL_ENDL;
-
if (! mVFS)
{
- LL_WARNS() << "Attempt to send VFile w/o available VFS" << LL_ENDL;
+ LL_WARNS("Xfer") << "Attempt to send VFile w/o available VFS" << LL_ENDL;
return;
}
+ /* Present in fireengine, not used by viewer
+ if (!validateVFileForTransfer(uuid.asString()))
+ {
+ // it is up to the app sending the file to mark it for expected
+ // transfer before the request arrives or it will be dropped
+ LL_WARNS("Xfer") << "SECURITY: Unapproved VFile '" << uuid << "'" << LL_ENDL;
+ return;
+ }
+ */
+
+ LL_INFOS("Xfer") << "starting vfile transfer: " << uuid << "," << LLAssetType::lookup(type) << " to " << mesgsys->getSender() << LL_ENDL;
+
xferp = (LLXfer *)new LLXfer_VFile(mVFS, uuid, type);
if (xferp)
{
- xferp->mNext = mSendList;
- mSendList = xferp;
+ mSendList.push_front(xferp);
result = xferp->startSend(id,mesgsys->getSender());
}
else
{
- LL_ERRS() << "Xfer allcoation error" << LL_ENDL;
+ LL_ERRS("Xfer") << "Xfer allcoation error" << LL_ENDL;
}
}
else if (!local_filename.empty())
- {
+ { // Was given a file name to send
// See DEV-21775 for detailed security issues
if (local_path == LL_PATH_NONE)
@@ -873,7 +841,7 @@ void LLXferManager::processFileRequest (LLMessageSystem *mesgsys, void ** /*user
case LL_PATH_NONE:
if(!validateFileForTransfer(local_filename))
{
- LL_WARNS() << "SECURITY: Unapproved filename '" << local_filename << LL_ENDL;
+ LL_WARNS("Xfer") << "SECURITY: Unapproved filename '" << local_filename << LL_ENDL;
return;
}
break;
@@ -881,13 +849,13 @@ void LLXferManager::processFileRequest (LLMessageSystem *mesgsys, void ** /*user
case LL_PATH_CACHE:
if(!verify_cache_filename(local_filename))
{
- LL_WARNS() << "SECURITY: Illegal cache filename '" << local_filename << LL_ENDL;
+ LL_WARNS("Xfer") << "SECURITY: Illegal cache filename '" << local_filename << LL_ENDL;
return;
}
break;
default:
- LL_WARNS() << "SECURITY: Restricted file dir enum: " << (U32)local_path << LL_ENDL;
+ LL_WARNS("Xfer") << "SECURITY: Restricted file dir enum: " << (U32)local_path << LL_ENDL;
return;
}
@@ -902,7 +870,7 @@ void LLXferManager::processFileRequest (LLMessageSystem *mesgsys, void ** /*user
{
expanded_filename = local_filename;
}
- LL_INFOS() << "starting file transfer: " << expanded_filename << " to " << mesgsys->getSender() << LL_ENDL;
+ LL_INFOS("Xfer") << "starting file transfer: " << expanded_filename << " to " << mesgsys->getSender() << LL_ENDL;
BOOL delete_local_on_completion = FALSE;
mesgsys->getBOOL("XferID", "DeleteOnCompletion", delete_local_on_completion);
@@ -912,23 +880,22 @@ void LLXferManager::processFileRequest (LLMessageSystem *mesgsys, void ** /*user
if (xferp)
{
- xferp->mNext = mSendList;
- mSendList = xferp;
+ mSendList.push_front(xferp);
result = xferp->startSend(id,mesgsys->getSender());
}
else
{
- LL_ERRS() << "Xfer allcoation error" << LL_ENDL;
+ LL_ERRS("Xfer") << "Xfer allcoation error" << LL_ENDL;
}
}
else
- {
+ { // no uuid or filename - use the ID sent
char U64_BUF[MAX_STRING]; /* Flawfinder : ignore */
- LL_INFOS() << "starting memory transfer: "
+ LL_INFOS("Xfer") << "starting memory transfer: "
<< U64_to_str(id, U64_BUF, sizeof(U64_BUF)) << " to "
<< mesgsys->getSender() << LL_ENDL;
- xferp = findXfer(id, mSendList);
+ xferp = findXferByID(id, mSendList);
if (xferp)
{
@@ -936,7 +903,7 @@ void LLXferManager::processFileRequest (LLMessageSystem *mesgsys, void ** /*user
}
else
{
- LL_INFOS() << "Warning: " << U64_BUF << " not found." << LL_ENDL;
+ LL_INFOS("Xfer") << "Warning: xfer ID " << U64_BUF << " not found." << LL_ENDL;
result = LL_ERR_FILE_NOT_FOUND;
}
}
@@ -946,11 +913,11 @@ void LLXferManager::processFileRequest (LLMessageSystem *mesgsys, void ** /*user
if (xferp)
{
xferp->abort(result);
- removeXfer(xferp,&mSendList);
+ removeXfer(xferp, mSendList);
}
else // can happen with a memory transfer not found
{
- LL_INFOS() << "Aborting xfer to " << mesgsys->getSender() << " with error: " << result << LL_ENDL;
+ LL_INFOS("Xfer") << "Aborting xfer to " << mesgsys->getSender() << " with error: " << result << LL_ENDL;
mesgsys->newMessageFast(_PREHASH_AbortXfer);
mesgsys->nextBlockFast(_PREHASH_XferID);
@@ -960,24 +927,86 @@ void LLXferManager::processFileRequest (LLMessageSystem *mesgsys, void ** /*user
mesgsys->sendMessage(mesgsys->getSender());
}
}
- else if(xferp && (numActiveXfers(xferp->mRemoteHost) < mMaxOutgoingXfersPerCircuit))
+ else if(xferp)
{
- xferp->sendNextPacket();
- changeNumActiveXfers(xferp->mRemoteHost,1);
-// LL_INFOS() << "***STARTING XFER IMMEDIATELY***" << LL_ENDL;
- }
- else
- {
- if(xferp)
+ // Figure out how many transfers the host has requested
+ LLHostStatus *host_statusp = findHostStatus(xferp->mRemoteHost);
+ if (host_statusp)
{
- LL_INFOS() << " queueing xfer request, " << numPendingXfers(xferp->mRemoteHost) << " ahead of this one" << LL_ENDL;
+ if (host_statusp->mNumActive < mMaxOutgoingXfersPerCircuit)
+ { // Not many transfers in progress already, so start immediately
+ xferp->sendNextPacket();
+ changeNumActiveXfers(xferp->mRemoteHost,1);
+ LL_DEBUGS("Xfer") << "Starting xfer ID " << U64_to_str(id) << " immediately" << LL_ENDL;
+ }
+ else if (mHardLimitOutgoingXfersPerCircuit == 0 ||
+ (host_statusp->mNumActive + host_statusp->mNumPending) < mHardLimitOutgoingXfersPerCircuit)
+ { // Must close the file handle and wait for earlier ones to complete
+ LL_INFOS("Xfer") << " queueing xfer request id " << U64_to_str(id) << ", "
+ << host_statusp->mNumActive << " active and "
+ << host_statusp->mNumPending << " pending ahead of this one"
+ << LL_ENDL;
+ xferp->closeFileHandle(); // Close the file handle until we're ready to send again
+ }
+ else if (mHardLimitOutgoingXfersPerCircuit > 0)
+ { // Way too many requested ... it's time to stop being nice and kill the circuit
+ xferp->closeFileHandle(); // Close the file handle in any case
+ LLCircuitData *cdp = gMessageSystem->mCircuitInfo.findCircuit(xferp->mRemoteHost);
+ if (cdp)
+ {
+ if (cdp->getTrusted())
+ { // Trusted internal circuit - don't kill it
+ LL_WARNS("Xfer") << "Trusted circuit to " << xferp->mRemoteHost << " has too many xfer requests in the queue "
+ << host_statusp->mNumActive << " active and "
+ << host_statusp->mNumPending << " pending ahead of this one"
+ << LL_ENDL;
+ }
+ else
+ { // Untrusted circuit - time to stop messing around and kill it
+ LL_WARNS("Xfer") << "Killing circuit to " << xferp->mRemoteHost << " for having too many xfer requests in the queue "
+ << host_statusp->mNumActive << " active and "
+ << host_statusp->mNumPending << " pending ahead of this one"
+ << LL_ENDL;
+ gMessageSystem->disableCircuit(xferp->mRemoteHost);
+ }
+ }
+ else
+ { // WTF? Why can't we find a circuit? Try to kill it off
+ LL_WARNS("Xfer") << "Backlog with circuit to " << xferp->mRemoteHost << " with too many xfer requests in the queue "
+ << host_statusp->mNumActive << " active and "
+ << host_statusp->mNumPending << " pending ahead of this one"
+ << " but no LLCircuitData found???"
+ << LL_ENDL;
+ gMessageSystem->disableCircuit(xferp->mRemoteHost);
+ }
+ }
}
else
{
- LL_WARNS() << "LLXferManager::processFileRequest() - no xfer found!"
- << LL_ENDL;
+ LL_WARNS("Xfer") << "LLXferManager::processFileRequest() - no LLHostStatus found for id " << U64_to_str(id)
+ << " host " << xferp->mRemoteHost << LL_ENDL;
}
}
+ else
+ {
+ LL_WARNS("Xfer") << "LLXferManager::processFileRequest() - no xfer found for id " << U64_to_str(id) << LL_ENDL;
+ }
+}
+
+///////////////////////////////////////////////////////////
+
+// Return true if host is in a transfer-flood sitation. Same check for both internal and external hosts
+bool LLXferManager::isHostFlooded(const LLHost & host)
+{
+ bool flooded = false;
+ LLHostStatus *host_statusp = findHostStatus(host);
+ if (host_statusp)
+ {
+ flooded = (mHardLimitOutgoingXfersPerCircuit > 0 &&
+ (host_statusp->mNumActive + host_statusp->mNumPending) >= (S32)(mHardLimitOutgoingXfersPerCircuit * 0.8f));
+ }
+
+ return flooded;
}
@@ -991,7 +1020,7 @@ void LLXferManager::processConfirmation (LLMessageSystem *mesgsys, void ** /*use
mesgsys->getU64Fast(_PREHASH_XferID, _PREHASH_ID, id);
mesgsys->getS32Fast(_PREHASH_XferID, _PREHASH_Packet, packetNum);
- LLXfer* xferp = findXfer(id, mSendList);
+ LLXfer* xferp = findXferByID(id, mSendList);
if (xferp)
{
// cout << "confirmed packet #" << packetNum << " ping: "<< xferp->ACKTimer.getElapsedTimeF32() << endl;
@@ -1002,91 +1031,105 @@ void LLXferManager::processConfirmation (LLMessageSystem *mesgsys, void ** /*use
}
else
{
- removeXfer(xferp, &mSendList);
+ removeXfer(xferp, mSendList);
}
}
}
///////////////////////////////////////////////////////////
-void LLXferManager::retransmitUnackedPackets ()
+// Called from LLMessageSystem::processAcks()
+void LLXferManager::retransmitUnackedPackets()
{
LLXfer *xferp;
- LLXfer *delp;
- xferp = mReceiveList;
- while(xferp)
+
+ xfer_list_t::iterator iter = mReceiveList.begin();
+ while (iter != mReceiveList.end())
{
+ xferp = (*iter);
if (xferp->mStatus == e_LL_XFER_IN_PROGRESS)
{
// if the circuit dies, abort
if (! gMessageSystem->mCircuitInfo.isCircuitAlive( xferp->mRemoteHost ))
{
- LL_INFOS() << "Xfer found in progress on dead circuit, aborting" << LL_ENDL;
+ LL_WARNS("Xfer") << "Xfer found in progress on dead circuit, aborting transfer to "
+ << xferp->mRemoteHost.getIPandPort()
+ << LL_ENDL;
xferp->mCallbackResult = LL_ERR_CIRCUIT_GONE;
xferp->processEOF();
- delp = xferp;
- xferp = xferp->mNext;
- removeXfer(delp,&mReceiveList);
+
+ iter = mReceiveList.erase(iter); // iter is set to next one after the deletion point
+ delete (xferp);
continue;
}
}
- xferp = xferp->mNext;
+ ++iter;
}
- xferp = mSendList;
+ // Re-build mOutgoingHosts data
updateHostStatus();
+
F32 et;
- while (xferp)
+ iter = mSendList.begin();
+ while (iter != mSendList.end())
{
+ xferp = (*iter);
if (xferp->mWaitingForACK && ( (et = xferp->ACKTimer.getElapsedTimeF32()) > LL_PACKET_TIMEOUT))
{
if (xferp->mRetries > LL_PACKET_RETRY_LIMIT)
{
- LL_INFOS() << "dropping xfer " << xferp->mRemoteHost << ":" << xferp->getFileName() << " packet retransmit limit exceeded, xfer dropped" << LL_ENDL;
+ LL_INFOS("Xfer") << "dropping xfer " << xferp->mRemoteHost << ":" << xferp->getFileName() << " packet retransmit limit exceeded, xfer dropped" << LL_ENDL;
xferp->abort(LL_ERR_TCP_TIMEOUT);
- delp = xferp;
- xferp = xferp->mNext;
- removeXfer(delp,&mSendList);
+ iter = mSendList.erase(iter);
+ delete xferp;
+ continue;
}
else
{
- LL_INFOS() << "resending xfer " << xferp->mRemoteHost << ":" << xferp->getFileName() << " packet unconfirmed after: "<< et << " sec, packet " << xferp->mPacketNum << LL_ENDL;
+ LL_INFOS("Xfer") << "resending xfer " << xferp->mRemoteHost << ":" << xferp->getFileName() << " packet unconfirmed after: "<< et << " sec, packet " << xferp->mPacketNum << LL_ENDL;
xferp->resendLastPacket();
- xferp = xferp->mNext;
}
}
else if ((xferp->mStatus == e_LL_XFER_REGISTERED) && ( (et = xferp->ACKTimer.getElapsedTimeF32()) > LL_XFER_REGISTRATION_TIMEOUT))
{
- LL_INFOS() << "registered xfer never requested, xfer dropped" << LL_ENDL;
+ LL_INFOS("Xfer") << "registered xfer never requested, xfer dropped" << LL_ENDL;
xferp->abort(LL_ERR_TCP_TIMEOUT);
- delp = xferp;
- xferp = xferp->mNext;
- removeXfer(delp,&mSendList);
+ iter = mSendList.erase(iter);
+ delete xferp;
+ continue;
}
else if (xferp->mStatus == e_LL_XFER_ABORTED)
{
- LL_WARNS() << "Removing aborted xfer " << xferp->mRemoteHost << ":" << xferp->getFileName() << LL_ENDL;
- delp = xferp;
- xferp = xferp->mNext;
- removeXfer(delp,&mSendList);
+ LL_WARNS("Xfer") << "Removing aborted xfer " << xferp->mRemoteHost << ":" << xferp->getFileName() << LL_ENDL;
+ iter = mSendList.erase(iter);
+ delete xferp;
+ continue;
}
else if (xferp->mStatus == e_LL_XFER_PENDING)
{
-// LL_INFOS() << "*** numActiveXfers = " << numActiveXfers(xferp->mRemoteHost) << " mMaxOutgoingXfersPerCircuit = " << mMaxOutgoingXfersPerCircuit << LL_ENDL;
+// LL_INFOS("Xfer") << "*** numActiveXfers = " << numActiveXfers(xferp->mRemoteHost) << " mMaxOutgoingXfersPerCircuit = " << mMaxOutgoingXfersPerCircuit << LL_ENDL;
if (numActiveXfers(xferp->mRemoteHost) < mMaxOutgoingXfersPerCircuit)
{
-// LL_INFOS() << "bumping pending xfer to active" << LL_ENDL;
- xferp->sendNextPacket();
- changeNumActiveXfers(xferp->mRemoteHost,1);
- }
- xferp = xferp->mNext;
- }
- else
- {
- xferp = xferp->mNext;
+ if (xferp->reopenFileHandle())
+ {
+ LL_WARNS("Xfer") << "Error re-opening file handle for xfer ID " << U64_to_str(xferp->mID)
+ << " to host " << xferp->mRemoteHost << LL_ENDL;
+ xferp->abort(LL_ERR_CANNOT_OPEN_FILE);
+ iter = mSendList.erase(iter);
+ delete xferp;
+ continue;
+ }
+ else
+ { // No error re-opening the file, send the first packet
+ LL_DEBUGS("Xfer") << "Moving pending xfer ID " << U64_to_str(xferp->mID) << " to active" << LL_ENDL;
+ xferp->sendNextPacket();
+ changeNumActiveXfers(xferp->mRemoteHost,1);
+ }
+ }
}
- }
+ ++iter;
+ } // end while() loop
//
// HACK - if we're using xfer confirm throttling, throttle our xfer confirms here
@@ -1099,10 +1142,10 @@ void LLXferManager::retransmitUnackedPackets ()
{
break;
}
- //LL_INFOS() << "Confirm packet queue length:" << mXferAckQueue.size() << LL_ENDL;
+ //LL_INFOS("Xfer") << "Confirm packet queue length:" << mXferAckQueue.size() << LL_ENDL;
LLXferAckInfo ack_info = mXferAckQueue.front();
mXferAckQueue.pop_front();
- //LL_INFOS() << "Sending confirm packet" << LL_ENDL;
+ //LL_INFOS("Xfer") << "Sending confirm packet" << LL_ENDL;
sendConfirmPacket(gMessageSystem, ack_info.mID, ack_info.mPacketNum, ack_info.mRemoteHost);
mAckThrottle.throttleOverflow(1000.f*8.f); // Assume 1000 bytes/packet
}
@@ -1112,7 +1155,7 @@ void LLXferManager::retransmitUnackedPackets ()
void LLXferManager::abortRequestById(U64 xfer_id, S32 result_code)
{
- LLXfer * xferp = findXfer(xfer_id, mReceiveList);
+ LLXfer * xferp = findXferByID(xfer_id, mReceiveList);
if (xferp)
{
if (xferp->mStatus == e_LL_XFER_IN_PROGRESS)
@@ -1124,7 +1167,7 @@ void LLXferManager::abortRequestById(U64 xfer_id, S32 result_code)
{
xferp->mCallbackResult = result_code;
xferp->processEOF(); //should notify requester
- removeXfer(xferp, &mReceiveList);
+ removeXfer(xferp, mReceiveList);
}
// Since already removed or marked as aborted no need
// to wait for processAbort() to start new download
@@ -1143,12 +1186,12 @@ void LLXferManager::processAbort (LLMessageSystem *mesgsys, void ** /*user_data*
mesgsys->getU64Fast(_PREHASH_XferID, _PREHASH_ID, id);
mesgsys->getS32Fast(_PREHASH_XferID, _PREHASH_Result, result_code);
- xferp = findXfer(id, mReceiveList);
+ xferp = findXferByID(id, mReceiveList);
if (xferp)
{
xferp->mCallbackResult = result_code;
xferp->processEOF();
- removeXfer(xferp, &mReceiveList);
+ removeXfer(xferp, mReceiveList);
startPendingDownloads();
}
}
@@ -1164,27 +1207,29 @@ void LLXferManager::startPendingDownloads()
// requests get pushed toward the back. Thus, if we didn't do a
// stateful iteration, it would be possible for old requests to
// never start.
- LLXfer* xferp = mReceiveList;
+ LLXfer* xferp;
std::list<LLXfer*> pending_downloads;
S32 download_count = 0;
S32 pending_count = 0;
- while(xferp)
+ for (xfer_list_t::iterator iter = mReceiveList.begin();
+ iter != mReceiveList.end();
+ ++iter)
{
+ xferp = (*iter);
if(xferp->mStatus == e_LL_XFER_PENDING)
- {
+ { // Count and accumulate pending downloads
++pending_count;
pending_downloads.push_front(xferp);
}
else if(xferp->mStatus == e_LL_XFER_IN_PROGRESS)
- {
+ { // Count downloads in progress
++download_count;
}
- xferp = xferp->mNext;
}
S32 start_count = mMaxIncomingXfers - download_count;
- LL_DEBUGS() << "LLXferManager::startPendingDownloads() - XFER_IN_PROGRESS: "
+ LL_DEBUGS("Xfer") << "LLXferManager::startPendingDownloads() - XFER_IN_PROGRESS: "
<< download_count << " XFER_PENDING: " << pending_count
<< " startring " << llmin(start_count, pending_count) << LL_ENDL;
@@ -1209,29 +1254,15 @@ void LLXferManager::startPendingDownloads()
///////////////////////////////////////////////////////////
-void LLXferManager::addToList(LLXfer* xferp, LLXfer*& head, BOOL is_priority)
+void LLXferManager::addToList(LLXfer* xferp, xfer_list_t & xfer_list, BOOL is_priority)
{
if(is_priority)
{
- xferp->mNext = NULL;
- LLXfer* next = head;
- if(next)
- {
- while(next->mNext)
- {
- next = next->mNext;
- }
- next->mNext = xferp;
- }
- else
- {
- head = xferp;
- }
+ xfer_list.push_back(xferp);
}
else
{
- xferp->mNext = head;
- head = xferp;
+ xfer_list.push_front(xferp);
}
}
diff --git a/indra/llmessage/llxfermanager.h b/indra/llmessage/llxfermanager.h
index d258f0a5ce..45ae2ffdd3 100644
--- a/indra/llmessage/llxfermanager.h
+++ b/indra/llmessage/llxfermanager.h
@@ -77,6 +77,7 @@ class LLXferManager
protected:
S32 mMaxOutgoingXfersPerCircuit;
+ S32 mHardLimitOutgoingXfersPerCircuit; // At this limit, kill off the connection
S32 mMaxIncomingXfers;
BOOL mUseAckThrottling; // Use ack throttling to cap file xfer bandwidth
@@ -92,19 +93,22 @@ class LLXferManager
HIGH_PRIORITY = TRUE,
};
- LLXfer *mSendList;
- LLXfer *mReceiveList;
+ // Linked FIFO list, add to the front and pull from back
+ typedef std::deque<LLXfer *> xfer_list_t;
+ xfer_list_t mSendList;
+ xfer_list_t mReceiveList;
typedef std::list<LLHostStatus*> status_list_t;
status_list_t mOutgoingHosts;
- private:
protected:
// implementation methods
virtual void startPendingDownloads();
- virtual void addToList(LLXfer* xferp, LLXfer*& head, BOOL is_priority);
+ virtual void addToList(LLXfer* xferp, xfer_list_t & xfer_list, BOOL is_priority);
std::multiset<std::string> mExpectedTransfers; // files that are authorized to transfer out
std::multiset<std::string> mExpectedRequests; // files that are authorized to be downloaded on top of
+ std::multiset<std::string> mExpectedVFileTransfers; // files that are authorized to transfer out
+ std::multiset<std::string> mExpectedVFileRequests; // files that are authorized to be downloaded on top of
public:
LLXferManager(LLVFS *vfs);
@@ -117,14 +121,17 @@ class LLXferManager
void setAckThrottleBPS(const F32 bps);
// list management routines
- virtual LLXfer *findXfer(U64 id, LLXfer *list_head);
- virtual void removeXfer (LLXfer *delp, LLXfer **list_head);
- virtual U32 numActiveListEntries(LLXfer *list_head);
+ virtual LLXfer *findXferByID(U64 id, xfer_list_t & xfer_list);
+ virtual void removeXfer (LLXfer *delp, xfer_list_t & xfer_list);
+
+ LLHostStatus * findHostStatus(const LLHost &host);
virtual S32 numActiveXfers(const LLHost &host);
virtual S32 numPendingXfers(const LLHost &host);
+
virtual void changeNumActiveXfers(const LLHost &host, S32 delta);
virtual void setMaxOutgoingXfersPerCircuit (S32 max_num);
+ virtual void setHardLimitOutgoingXfersPerCircuit(S32 max_num);
virtual void setMaxIncomingXfers(S32 max_num);
virtual void updateHostStatus();
virtual void printHostStatus();
@@ -136,8 +143,6 @@ class LLXferManager
virtual S32 decodePacketNum(S32 packet_num);
virtual BOOL isLastPacket(S32 packet_num);
- virtual U64 registerXfer(const void *datap, const S32 length);
-
// file requesting routines
// .. to file
virtual U64 requestFile(const std::string& local_filename,
@@ -148,7 +153,7 @@ class LLXferManager
void (*callback)(void**,S32,LLExtStat), void** user_data,
BOOL is_priority = FALSE,
BOOL use_big_packets = FALSE);
-
+ /*
// .. to memory
virtual void requestFile(const std::string& remote_filename,
ELLPath remote_path,
@@ -157,7 +162,7 @@ class LLXferManager
void (*callback)(void*, S32, void**, S32, LLExtStat),
void** user_data,
BOOL is_priority = FALSE);
-
+ */
// vfile requesting
// .. to vfile
virtual void requestVFile(const LLUUID &local_id, const LLUUID& remote_id,
@@ -180,18 +185,15 @@ class LLXferManager
virtual void expectFileForRequest(const std::string& filename);
virtual bool validateFileForRequest(const std::string& filename);
-/*
-// xfer request (may be memory or file)
-// .. to file
- virtual void requestXfer(const char *local_filename, U64 xfer_id,
- BOOL delete_remote_on_completion,
- const LLHost &remote_host, void (*callback)(void **,S32),void **user_data);
-// .. to memory
- virtual void requestXfer(U64 xfer_id,
- const LLHost &remote_host,
- BOOL delete_remote_on_completion,
- void (*callback)(void *, S32, void **, S32),void **user_data);
-*/
+ /**
+ Same idea but for VFiles, kept separate to avoid namespace overlap
+ */
+ /* Present in fireengine, not used by viewer
+ virtual void expectVFileForTransfer(const std::string& filename);
+ virtual bool validateVFileForTransfer(const std::string& filename);
+ virtual void expectVFileForRequest(const std::string& filename);
+ virtual bool validateVFileForRequest(const std::string& filename);
+ */
virtual void processReceiveData (LLMessageSystem *mesgsys, void **user_data);
virtual void sendConfirmPacket (LLMessageSystem *mesgsys, U64 id, S32 packetnum, const LLHost &remote_host);
@@ -204,6 +206,8 @@ class LLXferManager
// error handling
void abortRequestById(U64 xfer_id, S32 result_code);
virtual void processAbort (LLMessageSystem *mesgsys, void **user_data);
+
+ virtual bool isHostFlooded(const LLHost & host);
};
extern LLXferManager* gXferManager;