summaryrefslogtreecommitdiff
path: root/indra/llmessage
diff options
context:
space:
mode:
authorNat Goodspeed <nat@lindenlab.com>2012-06-07 05:47:50 -0400
committerNat Goodspeed <nat@lindenlab.com>2012-06-07 05:47:50 -0400
commitce563795e1f5d7493b975393bea9ec5cab90fd6a (patch)
treeedc0b403ee80989bfa55b6d1cae3680093fb61fc /indra/llmessage
parentd167ebe35f8cdec1ca88e0d817e2878f14a5aa68 (diff)
parent89ea7ccfc7fd4c33eab4ad9123141fa40231a00d (diff)
MAINT-1144: Merge llhttpclient_test.cpp fix back to tip
Diffstat (limited to 'indra/llmessage')
-rw-r--r--indra/llmessage/llassetstorage.cpp30
-rw-r--r--indra/llmessage/llbuffer.cpp90
-rw-r--r--indra/llmessage/llbuffer.h25
-rw-r--r--indra/llmessage/llbufferstream.cpp8
-rw-r--r--indra/llmessage/llcurl.cpp676
-rw-r--r--[-rwxr-xr-x]indra/llmessage/llcurl.h137
-rw-r--r--indra/llmessage/llhttpassetstorage.cpp10
-rw-r--r--indra/llmessage/llhttpclient.cpp14
-rw-r--r--indra/llmessage/lliohttpserver.cpp4
-rw-r--r--indra/llmessage/lliopipe.cpp6
-rw-r--r--indra/llmessage/lliopipe.h2
-rw-r--r--indra/llmessage/lliosocket.cpp3
-rw-r--r--indra/llmessage/llmime.cpp4
-rw-r--r--indra/llmessage/llpumpio.cpp51
-rw-r--r--indra/llmessage/llpumpio.h13
-rw-r--r--indra/llmessage/llsdmessage.cpp2
-rw-r--r--indra/llmessage/llsdmessagebuilder.cpp2
-rw-r--r--indra/llmessage/llsdmessagereader.cpp3
-rw-r--r--indra/llmessage/llsdrpcclient.h22
-rw-r--r--indra/llmessage/llurlrequest.cpp36
-rw-r--r--indra/llmessage/llurlrequest.h2
-rw-r--r--indra/llmessage/llxfer.h1
-rw-r--r--indra/llmessage/message.cpp6
-rw-r--r--indra/llmessage/tests/testrunner.py2
24 files changed, 898 insertions, 251 deletions
diff --git a/indra/llmessage/llassetstorage.cpp b/indra/llmessage/llassetstorage.cpp
index 31cdb1219b..9b86daebe5 100644
--- a/indra/llmessage/llassetstorage.cpp
+++ b/indra/llmessage/llassetstorage.cpp
@@ -149,8 +149,8 @@ void LLAssetInfo::setFromNameValue( const LLNameValue& nv )
setName( buf );
buf.assign( str, pos2, std::string::npos );
setDescription( buf );
- llinfos << "uuid: " << mUuid << llendl;
- llinfos << "creator: " << mCreatorID << llendl;
+ LL_DEBUGS("AssetStorage") << "uuid: " << mUuid << llendl;
+ LL_DEBUGS("AssetStorage") << "creator: " << mCreatorID << llendl;
}
///----------------------------------------------------------------------------
@@ -434,9 +434,9 @@ bool LLAssetStorage::findInStaticVFSAndInvokeCallback(const LLUUID& uuid, LLAsse
// IW - uuid is passed by value to avoid side effects, please don't re-add &
void LLAssetStorage::getAssetData(const LLUUID uuid, LLAssetType::EType type, LLGetAssetCallback callback, void *user_data, BOOL is_priority)
{
- lldebugs << "LLAssetStorage::getAssetData() - " << uuid << "," << LLAssetType::lookup(type) << llendl;
+ LL_DEBUGS("AssetStorage") << "LLAssetStorage::getAssetData() - " << uuid << "," << LLAssetType::lookup(type) << llendl;
- llinfos << "ASSET_TRACE requesting " << uuid << " type " << LLAssetType::lookup(type) << llendl;
+ LL_DEBUGS("AssetStorage") << "ASSET_TRACE requesting " << uuid << " type " << LLAssetType::lookup(type) << llendl;
if (user_data)
{
@@ -446,7 +446,7 @@ void LLAssetStorage::getAssetData(const LLUUID uuid, LLAssetType::EType type, LL
if (mShutDown)
{
- llinfos << "ASSET_TRACE cancelled " << uuid << " type " << LLAssetType::lookup(type) << " shutting down" << llendl;
+ LL_DEBUGS("AssetStorage") << "ASSET_TRACE cancelled " << uuid << " type " << LLAssetType::lookup(type) << " shutting down" << llendl;
if (callback)
{
@@ -468,7 +468,7 @@ void LLAssetStorage::getAssetData(const LLUUID uuid, LLAssetType::EType type, LL
// Try static VFS first.
if (findInStaticVFSAndInvokeCallback(uuid,type,callback,user_data))
{
- llinfos << "ASSET_TRACE asset " << uuid << " found in static VFS" << llendl;
+ LL_DEBUGS("AssetStorage") << "ASSET_TRACE asset " << uuid << " found in static VFS" << llendl;
return;
}
@@ -486,7 +486,7 @@ void LLAssetStorage::getAssetData(const LLUUID uuid, LLAssetType::EType type, LL
callback(mVFS, uuid, type, user_data, LL_ERR_NOERR, LL_EXSTAT_VFS_CACHED);
}
- llinfos << "ASSET_TRACE asset " << uuid << " found in VFS" << llendl;
+ LL_DEBUGS("AssetStorage") << "ASSET_TRACE asset " << uuid << " found in VFS" << llendl;
}
else
{
@@ -520,7 +520,7 @@ void LLAssetStorage::getAssetData(const LLUUID uuid, LLAssetType::EType type, LL
}
if (duplicate)
{
- llinfos << "Adding additional non-duplicate request for asset " << uuid
+ LL_DEBUGS("AssetStorage") << "Adding additional non-duplicate request for asset " << uuid
<< "." << LLAssetType::lookup(type) << llendl;
}
@@ -584,9 +584,9 @@ void LLAssetStorage::downloadCompleteCallback(
LLAssetType::EType file_type,
void* user_data, LLExtStat ext_status)
{
- llinfos << "ASSET_TRACE asset " << file_id << " downloadCompleteCallback" << llendl;
+ LL_DEBUGS("AssetStorage") << "ASSET_TRACE asset " << file_id << " downloadCompleteCallback" << llendl;
- lldebugs << "LLAssetStorage::downloadCompleteCallback() for " << file_id
+ LL_DEBUGS("AssetStorage") << "LLAssetStorage::downloadCompleteCallback() for " << file_id
<< "," << LLAssetType::lookup(file_type) << llendl;
LLAssetRequest* req = (LLAssetRequest*)user_data;
if(!req)
@@ -731,7 +731,7 @@ void LLAssetStorage::getEstateAsset(const LLHost &object_sim, const LLUUID &agen
tpvf.setAsset(asset_id, atype);
tpvf.setCallback(downloadEstateAssetCompleteCallback, req);
- llinfos << "Starting transfer for " << asset_id << llendl;
+ LL_DEBUGS("AssetStorage") << "Starting transfer for " << asset_id << llendl;
LLTransferTargetChannel *ttcp = gTransferManager.getTargetChannel(source_host, LLTCT_ASSET);
ttcp->requestTransfer(spe, tpvf, 100.f + (is_priority ? 1.f : 0.f));
}
@@ -871,7 +871,7 @@ void LLAssetStorage::getInvItemAsset(const LLHost &object_sim, const LLUUID &age
tpvf.setAsset(asset_id, atype);
tpvf.setCallback(downloadInvItemCompleteCallback, req);
- llinfos << "Starting transfer for inventory asset "
+ LL_DEBUGS("AssetStorage") << "Starting transfer for inventory asset "
<< item_id << " owned by " << owner_id << "," << task_id
<< llendl;
LLTransferTargetChannel *ttcp = gTransferManager.getTargetChannel(source_host, LLTCT_ASSET);
@@ -1211,7 +1211,7 @@ bool LLAssetStorage::deletePendingRequest(LLAssetStorage::ERequestType rt,
request_list_t* requests = getRequestList(rt);
if (deletePendingRequestImpl(requests, asset_type, asset_id))
{
- llinfos << "Asset " << getRequestName(rt) << " request for "
+ LL_DEBUGS("AssetStorage") << "Asset " << getRequestName(rt) << " request for "
<< asset_id << "." << LLAssetType::lookup(asset_type)
<< " removed from pending queue." << llendl;
return true;
@@ -1307,7 +1307,7 @@ void LLAssetStorage::getAssetData(const LLUUID uuid, LLAssetType::EType type, vo
user_data == ((LLLegacyAssetRequest *)tmp->mUserData)->mUserData)
{
// this is a duplicate from the same subsystem - throw it away
- llinfos << "Discarding duplicate request for UUID " << uuid << llendl;
+ LL_DEBUGS("AssetStorage") << "Discarding duplicate request for UUID " << uuid << llendl;
return;
}
}
@@ -1490,7 +1490,7 @@ void LLAssetStorage::reportMetric( const LLUUID& asset_id, const LLAssetType::ET
{
if( !metric_recipient )
{
- llinfos << "Couldn't store LLAssetStoreage::reportMetric - no metrics_recipient" << llendl;
+ LL_DEBUGS("AssetStorage") << "Couldn't store LLAssetStoreage::reportMetric - no metrics_recipient" << llendl;
return;
}
diff --git a/indra/llmessage/llbuffer.cpp b/indra/llmessage/llbuffer.cpp
index 0316797f00..250cace6e9 100644
--- a/indra/llmessage/llbuffer.cpp
+++ b/indra/llmessage/llbuffer.cpp
@@ -32,6 +32,9 @@
#include "llmath.h"
#include "llmemtype.h"
#include "llstl.h"
+#include "llthread.h"
+
+#define ASSERT_LLBUFFERARRAY_MUTEX_LOCKED llassert(!mMutexp || mMutexp->isSelfLocked());
/**
* LLSegment
@@ -224,7 +227,8 @@ void LLHeapBuffer::allocate(S32 size)
* LLBufferArray
*/
LLBufferArray::LLBufferArray() :
- mNextBaseChannel(0)
+ mNextBaseChannel(0),
+ mMutexp(NULL)
{
LLMemType m1(LLMemType::MTYPE_IO_BUFFER);
}
@@ -233,6 +237,8 @@ LLBufferArray::~LLBufferArray()
{
LLMemType m1(LLMemType::MTYPE_IO_BUFFER);
std::for_each(mBuffers.begin(), mBuffers.end(), DeletePointer());
+
+ delete mMutexp;
}
// static
@@ -243,14 +249,57 @@ LLChannelDescriptors LLBufferArray::makeChannelConsumer(
return rv;
}
+void LLBufferArray::lock()
+{
+ if(mMutexp)
+ {
+ mMutexp->lock() ;
+ }
+}
+
+void LLBufferArray::unlock()
+{
+ if(mMutexp)
+ {
+ mMutexp->unlock() ;
+ }
+}
+
+LLMutex* LLBufferArray::getMutex()
+{
+ return mMutexp ;
+}
+
+void LLBufferArray::setThreaded(bool threaded)
+{
+ if(threaded)
+ {
+ if(!mMutexp)
+ {
+ mMutexp = new LLMutex(NULL);
+ }
+ }
+ else
+ {
+ if(mMutexp)
+ {
+ delete mMutexp ;
+ mMutexp = NULL ;
+ }
+ }
+}
+
LLChannelDescriptors LLBufferArray::nextChannel()
{
LLChannelDescriptors rv(mNextBaseChannel++);
return rv;
}
+//mMutexp should be locked before calling this.
S32 LLBufferArray::capacity() const
{
+ ASSERT_LLBUFFERARRAY_MUTEX_LOCKED
+
S32 total = 0;
const_buffer_iterator_t iter = mBuffers.begin();
const_buffer_iterator_t end = mBuffers.end();
@@ -263,6 +312,8 @@ S32 LLBufferArray::capacity() const
bool LLBufferArray::append(S32 channel, const U8* src, S32 len)
{
+ LLMutexLock lock(mMutexp) ;
+
LLMemType m1(LLMemType::MTYPE_IO_BUFFER);
std::vector<LLSegment> segments;
if(copyIntoBuffers(channel, src, len, segments))
@@ -273,8 +324,11 @@ bool LLBufferArray::append(S32 channel, const U8* src, S32 len)
return false;
}
+//mMutexp should be locked before calling this.
bool LLBufferArray::prepend(S32 channel, const U8* src, S32 len)
{
+ ASSERT_LLBUFFERARRAY_MUTEX_LOCKED
+
LLMemType m1(LLMemType::MTYPE_IO_BUFFER);
std::vector<LLSegment> segments;
if(copyIntoBuffers(channel, src, len, segments))
@@ -293,6 +347,8 @@ bool LLBufferArray::insertAfter(
{
LLMemType m1(LLMemType::MTYPE_IO_BUFFER);
std::vector<LLSegment> segments;
+
+ LLMutexLock lock(mMutexp) ;
if(mSegments.end() != segment)
{
++segment;
@@ -305,8 +361,11 @@ bool LLBufferArray::insertAfter(
return false;
}
+//mMutexp should be locked before calling this.
LLBufferArray::segment_iterator_t LLBufferArray::splitAfter(U8* address)
{
+ ASSERT_LLBUFFERARRAY_MUTEX_LOCKED
+
LLMemType m1(LLMemType::MTYPE_IO_BUFFER);
segment_iterator_t end = mSegments.end();
segment_iterator_t it = getSegment(address);
@@ -335,20 +394,26 @@ LLBufferArray::segment_iterator_t LLBufferArray::splitAfter(U8* address)
return rv;
}
+//mMutexp should be locked before calling this.
LLBufferArray::segment_iterator_t LLBufferArray::beginSegment()
{
+ ASSERT_LLBUFFERARRAY_MUTEX_LOCKED
return mSegments.begin();
}
+//mMutexp should be locked before calling this.
LLBufferArray::segment_iterator_t LLBufferArray::endSegment()
{
+ ASSERT_LLBUFFERARRAY_MUTEX_LOCKED
return mSegments.end();
}
+//mMutexp should be locked before calling this.
LLBufferArray::segment_iterator_t LLBufferArray::constructSegmentAfter(
U8* address,
LLSegment& segment)
{
+ ASSERT_LLBUFFERARRAY_MUTEX_LOCKED
LLMemType m1(LLMemType::MTYPE_IO_BUFFER);
segment_iterator_t rv = mSegments.begin();
segment_iterator_t end = mSegments.end();
@@ -395,8 +460,10 @@ LLBufferArray::segment_iterator_t LLBufferArray::constructSegmentAfter(
return rv;
}
+//mMutexp should be locked before calling this.
LLBufferArray::segment_iterator_t LLBufferArray::getSegment(U8* address)
{
+ ASSERT_LLBUFFERARRAY_MUTEX_LOCKED
segment_iterator_t end = mSegments.end();
if(!address)
{
@@ -414,9 +481,11 @@ LLBufferArray::segment_iterator_t LLBufferArray::getSegment(U8* address)
return end;
}
+//mMutexp should be locked before calling this.
LLBufferArray::const_segment_iterator_t LLBufferArray::getSegment(
U8* address) const
{
+ ASSERT_LLBUFFERARRAY_MUTEX_LOCKED
const_segment_iterator_t end = mSegments.end();
if(!address)
{
@@ -466,6 +535,8 @@ S32 LLBufferArray::countAfter(S32 channel, U8* start) const
S32 count = 0;
S32 offset = 0;
const_segment_iterator_t it;
+
+ LLMutexLock lock(mMutexp) ;
const_segment_iterator_t end = mSegments.end();
if(start)
{
@@ -517,6 +588,8 @@ U8* LLBufferArray::readAfter(
len = 0;
S32 bytes_to_copy = 0;
const_segment_iterator_t it;
+
+ LLMutexLock lock(mMutexp) ;
const_segment_iterator_t end = mSegments.end();
if(start)
{
@@ -568,6 +641,7 @@ U8* LLBufferArray::seek(
U8* start,
S32 delta) const
{
+ ASSERT_LLBUFFERARRAY_MUTEX_LOCKED
LLMemType m1(LLMemType::MTYPE_IO_BUFFER);
const_segment_iterator_t it;
const_segment_iterator_t end = mSegments.end();
@@ -709,9 +783,14 @@ U8* LLBufferArray::seek(
return rv;
}
+//test use only
bool LLBufferArray::takeContents(LLBufferArray& source)
{
LLMemType m1(LLMemType::MTYPE_IO_BUFFER);
+
+ LLMutexLock lock(mMutexp);
+ source.lock();
+
std::copy(
source.mBuffers.begin(),
source.mBuffers.end(),
@@ -723,13 +802,17 @@ bool LLBufferArray::takeContents(LLBufferArray& source)
std::back_insert_iterator<segment_list_t>(mSegments));
source.mSegments.clear();
source.mNextBaseChannel = 0;
+ source.unlock();
+
return true;
}
+//mMutexp should be locked before calling this.
LLBufferArray::segment_iterator_t LLBufferArray::makeSegment(
S32 channel,
S32 len)
{
+ ASSERT_LLBUFFERARRAY_MUTEX_LOCKED
LLMemType m1(LLMemType::MTYPE_IO_BUFFER);
// start at the end of the buffers, because it is the most likely
// to have free space.
@@ -765,8 +848,10 @@ LLBufferArray::segment_iterator_t LLBufferArray::makeSegment(
return send;
}
+//mMutexp should be locked before calling this.
bool LLBufferArray::eraseSegment(const segment_iterator_t& erase_iter)
{
+ ASSERT_LLBUFFERARRAY_MUTEX_LOCKED
LLMemType m1(LLMemType::MTYPE_IO_BUFFER);
// Find out which buffer contains the segment, and if it is found,
@@ -792,13 +877,14 @@ bool LLBufferArray::eraseSegment(const segment_iterator_t& erase_iter)
return rv;
}
-
+//mMutexp should be locked before calling this.
bool LLBufferArray::copyIntoBuffers(
S32 channel,
const U8* src,
S32 len,
std::vector<LLSegment>& segments)
{
+ ASSERT_LLBUFFERARRAY_MUTEX_LOCKED
LLMemType m1(LLMemType::MTYPE_IO_BUFFER);
if(!src || !len) return false;
S32 copied = 0;
diff --git a/indra/llmessage/llbuffer.h b/indra/llmessage/llbuffer.h
index 1c42b6fbc6..ccdb9fa7ee 100644
--- a/indra/llmessage/llbuffer.h
+++ b/indra/llmessage/llbuffer.h
@@ -39,6 +39,7 @@
#include <list>
#include <vector>
+class LLMutex;
/**
* @class LLChannelDescriptors
* @brief A way simple interface to accesss channels inside a buffer
@@ -564,6 +565,29 @@ public:
* @return Returns true on success.
*/
bool eraseSegment(const segment_iterator_t& iter);
+
+ /**
+ * @brief Lock the mutex if it exists
+ * This method locks mMutexp to make accessing LLBufferArray thread-safe
+ */
+ void lock();
+
+ /**
+ * @brief Unlock the mutex if it exists
+ */
+ void unlock();
+
+ /**
+ * @brief Return mMutexp
+ */
+ LLMutex* getMutex();
+
+ /**
+ * @brief Set LLBufferArray to be shared across threads or not
+ * This method is to create mMutexp if is threaded.
+ * @param threaded Indicates this LLBufferArray instance is shared across threads if true.
+ */
+ void setThreaded(bool threaded);
//@}
protected:
@@ -595,6 +619,7 @@ protected:
S32 mNextBaseChannel;
buffer_list_t mBuffers;
segment_list_t mSegments;
+ LLMutex* mMutexp;
};
#endif // LL_LLBUFFER_H
diff --git a/indra/llmessage/llbufferstream.cpp b/indra/llmessage/llbufferstream.cpp
index 6257983c43..8d8ad05ad5 100644
--- a/indra/llmessage/llbufferstream.cpp
+++ b/indra/llmessage/llbufferstream.cpp
@@ -31,6 +31,7 @@
#include "llbuffer.h"
#include "llmemtype.h"
+#include "llthread.h"
static const S32 DEFAULT_OUTPUT_SEGMENT_SIZE = 1024 * 4;
@@ -62,6 +63,7 @@ int LLBufferStreamBuf::underflow()
return EOF;
}
+ LLMutexLock lock(mBuffer->getMutex());
LLBufferArray::segment_iterator_t iter;
LLBufferArray::segment_iterator_t end = mBuffer->endSegment();
U8* last_pos = (U8*)gptr();
@@ -149,6 +151,7 @@ int LLBufferStreamBuf::overflow(int c)
// since we got here, we have a buffer, and we have a character to
// put on it.
LLBufferArray::segment_iterator_t it;
+ LLMutexLock lock(mBuffer->getMutex());
it = mBuffer->makeSegment(mChannels.out(), DEFAULT_OUTPUT_SEGMENT_SIZE);
if(it != mBuffer->endSegment())
{
@@ -210,6 +213,7 @@ int LLBufferStreamBuf::sync()
// *NOTE: I bet we could just --address if address is not NULL.
// Need to think about that.
+ LLMutexLock lock(mBuffer->getMutex());
address = mBuffer->seek(mChannels.out(), address, -1);
if(address)
{
@@ -273,6 +277,8 @@ streampos LLBufferStreamBuf::seekoff(
// NULL is fine
break;
}
+
+ LLMutexLock lock(mBuffer->getMutex());
address = mBuffer->seek(mChannels.in(), base_addr, off);
if(address)
{
@@ -304,6 +310,8 @@ streampos LLBufferStreamBuf::seekoff(
// NULL is fine
break;
}
+
+ LLMutexLock lock(mBuffer->getMutex());
address = mBuffer->seek(mChannels.out(), base_addr, off);
if(address)
{
diff --git a/indra/llmessage/llcurl.cpp b/indra/llmessage/llcurl.cpp
index 330028c926..b93d429feb 100644
--- a/indra/llmessage/llcurl.cpp
+++ b/indra/llmessage/llcurl.cpp
@@ -72,10 +72,9 @@
static const U32 EASY_HANDLE_POOL_SIZE = 5;
static const S32 MULTI_PERFORM_CALL_REPEAT = 5;
-static const S32 CURL_REQUEST_TIMEOUT = 30; // seconds
+static const S32 CURL_REQUEST_TIMEOUT = 30; // seconds per operation
static const S32 MAX_ACTIVE_REQUEST_COUNT = 100;
-static
// DEBUG //
S32 gCurlEasyCount = 0;
S32 gCurlMultiCount = 0;
@@ -86,9 +85,12 @@ S32 gCurlMultiCount = 0;
std::vector<LLMutex*> LLCurl::sSSLMutex;
std::string LLCurl::sCAPath;
std::string LLCurl::sCAFile;
-
-bool LLCurl::sMultiThreaded = false;
-static U32 sMainThreadID = 0;
+LLCurlThread* LLCurl::sCurlThread = NULL ;
+LLMutex* LLCurl::sHandleMutexp = NULL ;
+S32 LLCurl::sTotalHandles = 0 ;
+bool LLCurl::sNotQuitting = true;
+F32 LLCurl::sCurlRequestTimeOut = 120.f; //seonds
+S32 LLCurl::sMaxHandles = 256; //max number of handles, (multi handles and easy handles combined).
void check_curl_code(CURLcode code)
{
@@ -221,17 +223,20 @@ namespace boost
std::set<CURL*> LLCurl::Easy::sFreeHandles;
std::set<CURL*> LLCurl::Easy::sActiveHandles;
-LLMutex* LLCurl::Easy::sHandleMutex = NULL;
-LLMutex* LLCurl::Easy::sMultiMutex = NULL;
+LLMutex* LLCurl::Easy::sHandleMutexp = NULL ;
//static
CURL* LLCurl::Easy::allocEasyHandle()
{
+ llassert(LLCurl::getCurlThread()) ;
+
CURL* ret = NULL;
- LLMutexLock lock(sHandleMutex);
+
+ LLMutexLock lock(sHandleMutexp) ;
+
if (sFreeHandles.empty())
{
- ret = curl_easy_init();
+ ret = LLCurl::newEasyHandle();
}
else
{
@@ -251,20 +256,30 @@ CURL* LLCurl::Easy::allocEasyHandle()
//static
void LLCurl::Easy::releaseEasyHandle(CURL* handle)
{
+ static const S32 MAX_NUM_FREE_HANDLES = 32 ;
+
if (!handle)
{
- llerrs << "handle cannot be NULL!" << llendl;
+ return ; //handle allocation failed.
+ //llerrs << "handle cannot be NULL!" << llendl;
}
- LLMutexLock lock(sHandleMutex);
-
+ LLMutexLock lock(sHandleMutexp) ;
if (sActiveHandles.find(handle) != sActiveHandles.end())
{
sActiveHandles.erase(handle);
+
+ if(sFreeHandles.size() < MAX_NUM_FREE_HANDLES)
+ {
sFreeHandles.insert(handle);
}
else
{
+ LLCurl::deleteEasyHandle(handle) ;
+ }
+ }
+ else
+ {
llerrs << "Invalid handle." << llendl;
}
}
@@ -304,6 +319,14 @@ LLCurl::Easy::~Easy()
--gCurlEasyCount;
curl_slist_free_all(mHeaders);
for_each(mStrings.begin(), mStrings.end(), DeletePointerArray());
+
+ if (mResponder && LLCurl::sNotQuitting) //aborted
+ {
+ std::string reason("Request timeout, aborted.") ;
+ mResponder->completedRaw(408, //HTTP_REQUEST_TIME_OUT, timeout, abort
+ reason, mChannels, mOutput);
+ }
+ mResponder = NULL;
}
void LLCurl::Easy::resetState()
@@ -430,9 +453,9 @@ size_t curlReadCallback(char* data, size_t size, size_t nmemb, void* user_data)
LLCurl::Easy* easy = (LLCurl::Easy*)user_data;
S32 n = size * nmemb;
- S32 startpos = easy->getInput().tellg();
+ S32 startpos = (S32)easy->getInput().tellg();
easy->getInput().seekg(0, std::ios::end);
- S32 endpos = easy->getInput().tellg();
+ S32 endpos = (S32)easy->getInput().tellg();
easy->getInput().seekg(startpos, std::ios::beg);
S32 maxn = endpos - startpos;
n = llmin(n, maxn);
@@ -476,6 +499,7 @@ void LLCurl::Easy::prepRequest(const std::string& url,
LLProxy::getInstance()->applyProxySettings(this);
mOutput.reset(new LLBufferArray);
+ mOutput->setThreaded(true);
setopt(CURLOPT_WRITEFUNCTION, (void*)&curlWriteCallback);
setopt(CURLOPT_WRITEDATA, (void*)this);
@@ -519,48 +543,56 @@ void LLCurl::Easy::prepRequest(const std::string& url,
}
////////////////////////////////////////////////////////////////////////////
-
-LLCurl::Multi::Multi()
- : LLThread("Curl Multi"),
- mQueued(0),
+LLCurl::Multi::Multi(F32 idle_time_out)
+ : mQueued(0),
mErrorCount(0),
- mPerformState(PERFORM_STATE_READY)
+ mState(STATE_READY),
+ mDead(FALSE),
+ mMutexp(NULL),
+ mDeletionMutexp(NULL),
+ mEasyMutexp(NULL)
{
- mQuitting = false;
-
- mThreaded = LLCurl::sMultiThreaded && LLThread::currentID() == sMainThreadID;
- if (mThreaded)
- {
- mSignal = new LLCondition(NULL);
- }
- else
- {
- mSignal = NULL;
- }
-
- mCurlMultiHandle = curl_multi_init();
+ mCurlMultiHandle = LLCurl::newMultiHandle();
if (!mCurlMultiHandle)
{
llwarns << "curl_multi_init() returned NULL! Easy handles: " << gCurlEasyCount << " Multi handles: " << gCurlMultiCount << llendl;
- mCurlMultiHandle = curl_multi_init();
+ mCurlMultiHandle = LLCurl::newMultiHandle();
}
- llassert_always(mCurlMultiHandle);
+ //llassert_always(mCurlMultiHandle);
+
+ if(mCurlMultiHandle)
+ {
+ if(LLCurl::getCurlThread()->getThreaded())
+ {
+ mMutexp = new LLMutex(NULL) ;
+ mDeletionMutexp = new LLMutex(NULL) ;
+ mEasyMutexp = new LLMutex(NULL) ;
+ }
+ LLCurl::getCurlThread()->addMulti(this) ;
+
+ mIdleTimeOut = idle_time_out ;
+ if(mIdleTimeOut < LLCurl::sCurlRequestTimeOut)
+ {
+ mIdleTimeOut = LLCurl::sCurlRequestTimeOut ;
+ }
+
++gCurlMultiCount;
}
+}
LLCurl::Multi::~Multi()
{
- llassert(isStopped());
+ cleanup() ;
+}
- if (LLCurl::sMultiThreaded)
+void LLCurl::Multi::cleanup()
+{
+ if(!mCurlMultiHandle)
{
- LLCurl::Easy::sMultiMutex->lock();
+ return ; //nothing to clean.
}
- delete mSignal;
- mSignal = NULL;
-
// Clean up active
for(easy_active_list_t::iterator iter = mEasyActiveList.begin();
iter != mEasyActiveList.end(); ++iter)
@@ -576,76 +608,157 @@ LLCurl::Multi::~Multi()
for_each(mEasyFreeList.begin(), mEasyFreeList.end(), DeletePointer());
mEasyFreeList.clear();
- check_curl_multi_code(curl_multi_cleanup(mCurlMultiHandle));
+ check_curl_multi_code(LLCurl::deleteMultiHandle(mCurlMultiHandle));
+ mCurlMultiHandle = NULL ;
+
+ delete mMutexp ;
+ mMutexp = NULL ;
+ delete mDeletionMutexp ;
+ mDeletionMutexp = NULL ;
+ delete mEasyMutexp ;
+ mEasyMutexp = NULL ;
+
+ mQueued = 0 ;
+ mState = STATE_COMPLETED;
+
--gCurlMultiCount;
- if (LLCurl::sMultiThreaded)
+ return ;
+}
+
+void LLCurl::Multi::lock()
+{
+ if(mMutexp)
{
- LLCurl::Easy::sMultiMutex->unlock();
+ mMutexp->lock() ;
}
}
-CURLMsg* LLCurl::Multi::info_read(S32* msgs_in_queue)
+void LLCurl::Multi::unlock()
{
- CURLMsg* curlmsg = curl_multi_info_read(mCurlMultiHandle, msgs_in_queue);
- return curlmsg;
+ if(mMutexp)
+ {
+ mMutexp->unlock() ;
+ }
+}
+
+void LLCurl::Multi::markDead()
+{
+ LLMutexLock lock(mDeletionMutexp) ;
+
+ mDead = TRUE ;
+ LLCurl::getCurlThread()->setPriority(mHandle, LLQueuedThread::PRIORITY_URGENT) ;
}
-void LLCurl::Multi::perform()
+void LLCurl::Multi::setState(LLCurl::Multi::ePerformState state)
{
- if (mThreaded)
+ lock() ;
+ mState = state ;
+ unlock() ;
+
+ if(mState == STATE_READY)
{
- if (mPerformState == PERFORM_STATE_READY)
- {
- mSignal->signal();
- }
+ LLCurl::getCurlThread()->setPriority(mHandle, LLQueuedThread::PRIORITY_NORMAL) ;
+ }
+}
+
+LLCurl::Multi::ePerformState LLCurl::Multi::getState()
+{
+ return mState;
+}
+
+bool LLCurl::Multi::isCompleted()
+{
+ return STATE_COMPLETED == getState() ;
+}
+
+bool LLCurl::Multi::waitToComplete()
+{
+ if(!isValid())
+ {
+ return true ;
}
- else
+
+ if(!mMutexp) //not threaded
+ {
+ doPerform() ;
+ return true ;
+ }
+
+ bool completed = (STATE_COMPLETED == mState) ;
+ if(!completed)
{
- doPerform();
+ LLCurl::getCurlThread()->setPriority(mHandle, LLQueuedThread::PRIORITY_HIGH) ;
}
+
+ return completed;
}
-void LLCurl::Multi::run()
+CURLMsg* LLCurl::Multi::info_read(S32* msgs_in_queue)
{
- llassert(mThreaded);
+ LLMutexLock lock(mMutexp) ;
- while (!mQuitting)
- {
- mSignal->wait();
- mPerformState = PERFORM_STATE_PERFORMING;
- if (!mQuitting)
- {
- LLMutexLock lock(LLCurl::Easy::sMultiMutex);
- doPerform();
- }
- }
+ CURLMsg* curlmsg = curl_multi_info_read(mCurlMultiHandle, msgs_in_queue);
+ return curlmsg;
}
-void LLCurl::Multi::doPerform()
+//return true if dead
+bool LLCurl::Multi::doPerform()
{
- S32 q = 0;
- for (S32 call_count = 0;
- call_count < MULTI_PERFORM_CALL_REPEAT;
- call_count += 1)
+ LLMutexLock lock(mDeletionMutexp) ;
+
+ bool dead = mDead ;
+
+ if(mDead)
{
- CURLMcode code = curl_multi_perform(mCurlMultiHandle, &q);
- if (CURLM_CALL_MULTI_PERFORM != code || q == 0)
+ setState(STATE_COMPLETED);
+ mQueued = 0 ;
+ }
+ else if(getState() != STATE_COMPLETED)
+ {
+ setState(STATE_PERFORMING);
+
+ S32 q = 0;
+ for (S32 call_count = 0;
+ call_count < MULTI_PERFORM_CALL_REPEAT;
+ call_count++)
{
- check_curl_multi_code(code);
- break;
+ LLMutexLock lock(mMutexp) ;
+
+ //WARNING: curl_multi_perform will block for many hundreds of milliseconds
+ // NEVER call this from the main thread, and NEVER allow the main thread to
+ // wait on a mutex held by this thread while curl_multi_perform is executing
+ CURLMcode code = curl_multi_perform(mCurlMultiHandle, &q);
+ if (CURLM_CALL_MULTI_PERFORM != code || q == 0)
+ {
+ check_curl_multi_code(code);
+
+ break;
+ }
}
-
+
+ mQueued = q;
+ setState(STATE_COMPLETED) ;
+ mIdleTimer.reset() ;
+ }
+ else if(mIdleTimer.getElapsedTimeF32() > mIdleTimeOut) //idle for too long, remove it.
+ {
+ dead = true ;
}
- mQueued = q;
- mPerformState = PERFORM_STATE_COMPLETED;
+
+ return dead ;
}
S32 LLCurl::Multi::process()
{
- perform();
+ if(!isValid())
+ {
+ return 0 ;
+ }
+
+ waitToComplete() ;
- if (mPerformState != PERFORM_STATE_COMPLETED)
+ if (getState() != STATE_COMPLETED)
{
return 0;
}
@@ -660,10 +773,19 @@ S32 LLCurl::Multi::process()
if (msg->msg == CURLMSG_DONE)
{
U32 response = 0;
- easy_active_map_t::iterator iter = mEasyActiveMap.find(msg->easy_handle);
- if (iter != mEasyActiveMap.end())
+ Easy* easy = NULL ;
+
+ {
+ LLMutexLock lock(mEasyMutexp) ;
+ easy_active_map_t::iterator iter = mEasyActiveMap.find(msg->easy_handle);
+ if (iter != mEasyActiveMap.end())
+ {
+ easy = iter->second;
+ }
+ }
+
+ if(easy)
{
- Easy* easy = iter->second;
response = easy->report(msg->data.result);
removeEasy(easy);
}
@@ -681,25 +803,28 @@ S32 LLCurl::Multi::process()
}
}
- mPerformState = PERFORM_STATE_READY;
+ setState(STATE_READY);
+
return processed;
}
LLCurl::Easy* LLCurl::Multi::allocEasy()
{
- Easy* easy = 0;
+ Easy* easy = 0;
if (mEasyFreeList.empty())
- {
+ {
easy = Easy::getEasy();
}
else
{
+ LLMutexLock lock(mEasyMutexp) ;
easy = *(mEasyFreeList.begin());
mEasyFreeList.erase(easy);
}
if (easy)
{
+ LLMutexLock lock(mEasyMutexp) ;
mEasyActiveList.insert(easy);
mEasyActiveMap[easy->getCurlHandle()] = easy;
}
@@ -708,6 +833,7 @@ LLCurl::Easy* LLCurl::Multi::allocEasy()
bool LLCurl::Multi::addEasy(Easy* easy)
{
+ LLMutexLock lock(mMutexp) ;
CURLMcode mcode = curl_multi_add_handle(mCurlMultiHandle, easy->getCurlHandle());
check_curl_multi_code(mcode);
//if (mcode != CURLM_OK)
@@ -720,25 +846,156 @@ bool LLCurl::Multi::addEasy(Easy* easy)
void LLCurl::Multi::easyFree(Easy* easy)
{
+ if(mEasyMutexp)
+ {
+ mEasyMutexp->lock() ;
+ }
+
mEasyActiveList.erase(easy);
mEasyActiveMap.erase(easy->getCurlHandle());
+
if (mEasyFreeList.size() < EASY_HANDLE_POOL_SIZE)
- {
- easy->resetState();
+ {
mEasyFreeList.insert(easy);
+
+ if(mEasyMutexp)
+ {
+ mEasyMutexp->unlock() ;
+ }
+
+ easy->resetState();
}
else
{
+ if(mEasyMutexp)
+ {
+ mEasyMutexp->unlock() ;
+ }
delete easy;
}
}
void LLCurl::Multi::removeEasy(Easy* easy)
{
- check_curl_multi_code(curl_multi_remove_handle(mCurlMultiHandle, easy->getCurlHandle()));
+ {
+ LLMutexLock lock(mMutexp) ;
+ check_curl_multi_code(curl_multi_remove_handle(mCurlMultiHandle, easy->getCurlHandle()));
+ }
easyFree(easy);
}
+//------------------------------------------------------------
+//LLCurlThread
+LLCurlThread::CurlRequest::CurlRequest(handle_t handle, LLCurl::Multi* multi, LLCurlThread* curl_thread) :
+ LLQueuedThread::QueuedRequest(handle, LLQueuedThread::PRIORITY_NORMAL, FLAG_AUTO_COMPLETE),
+ mMulti(multi),
+ mCurlThread(curl_thread)
+{
+}
+
+LLCurlThread::CurlRequest::~CurlRequest()
+{
+ if(mMulti)
+ {
+ mCurlThread->deleteMulti(mMulti) ;
+ mMulti = NULL ;
+ }
+}
+
+bool LLCurlThread::CurlRequest::processRequest()
+{
+ bool completed = true ;
+ if(mMulti)
+ {
+ completed = mCurlThread->doMultiPerform(mMulti) ;
+
+ if(!completed)
+ {
+ setPriority(LLQueuedThread::PRIORITY_LOW) ;
+ }
+ }
+
+ return completed ;
+}
+
+void LLCurlThread::CurlRequest::finishRequest(bool completed)
+{
+ if(mMulti->isDead())
+ {
+ mCurlThread->deleteMulti(mMulti) ;
+ }
+ else
+ {
+ mCurlThread->cleanupMulti(mMulti) ; //being idle too long, remove the request.
+ }
+
+ mMulti = NULL ;
+}
+
+LLCurlThread::LLCurlThread(bool threaded) :
+ LLQueuedThread("curlthread", threaded)
+{
+}
+
+//virtual
+LLCurlThread::~LLCurlThread()
+{
+}
+
+S32 LLCurlThread::update(F32 max_time_ms)
+{
+ return LLQueuedThread::update(max_time_ms);
+}
+
+void LLCurlThread::addMulti(LLCurl::Multi* multi)
+{
+ multi->mHandle = generateHandle() ;
+
+ CurlRequest* req = new CurlRequest(multi->mHandle, multi, this) ;
+
+ if (!addRequest(req))
+ {
+ llwarns << "curl request added when the thread is quitted" << llendl;
+ }
+}
+
+void LLCurlThread::killMulti(LLCurl::Multi* multi)
+{
+ if(!multi)
+ {
+ return ;
+ }
+
+ if(multi->isValid())
+ {
+ multi->markDead() ;
+}
+ else
+ {
+ deleteMulti(multi) ;
+ }
+}
+
+//private
+bool LLCurlThread::doMultiPerform(LLCurl::Multi* multi)
+{
+ return multi->doPerform() ;
+}
+
+//private
+void LLCurlThread::deleteMulti(LLCurl::Multi* multi)
+{
+ delete multi ;
+}
+
+//private
+void LLCurlThread::cleanupMulti(LLCurl::Multi* multi)
+{
+ multi->cleanup() ;
+}
+
+//------------------------------------------------------------
+
//static
std::string LLCurl::strerror(CURLcode errorcode)
{
@@ -753,39 +1010,30 @@ LLCurlRequest::LLCurlRequest() :
mActiveMulti(NULL),
mActiveRequestCount(0)
{
- mThreadID = LLThread::currentID();
mProcessing = FALSE;
}
LLCurlRequest::~LLCurlRequest()
{
- llassert_always(mThreadID == LLThread::currentID());
-
//stop all Multi handle background threads
for (curlmulti_set_t::iterator iter = mMultiSet.begin(); iter != mMultiSet.end(); ++iter)
{
- LLCurl::Multi* multi = *iter;
- multi->mQuitting = true;
- if (multi->mThreaded)
- {
- while (!multi->isStopped())
- {
- multi->mSignal->signal();
- apr_sleep(1000);
- }
- }
+ LLCurl::getCurlThread()->killMulti(*iter) ;
}
- for_each(mMultiSet.begin(), mMultiSet.end(), DeletePointer());
+ mMultiSet.clear() ;
}
void LLCurlRequest::addMulti()
{
- llassert_always(mThreadID == LLThread::currentID());
LLCurl::Multi* multi = new LLCurl::Multi();
- if (multi->mThreaded)
+ if(!multi->isValid())
{
- multi->start();
+ LLCurl::getCurlThread()->killMulti(multi) ;
+ mActiveMulti = NULL ;
+ mActiveRequestCount = 0 ;
+ return;
}
+
mMultiSet.insert(multi);
mActiveMulti = multi;
mActiveRequestCount = 0;
@@ -799,7 +1047,12 @@ LLCurl::Easy* LLCurlRequest::allocEasy()
{
addMulti();
}
- llassert_always(mActiveMulti);
+ if(!mActiveMulti)
+ {
+ return NULL ;
+ }
+
+ //llassert_always(mActiveMulti);
++mActiveRequestCount;
LLCurl::Easy* easy = mActiveMulti->allocEasy();
return easy;
@@ -901,7 +1154,6 @@ bool LLCurlRequest::post(const std::string& url,
// Note: call once per frame
S32 LLCurlRequest::process()
{
- llassert_always(mThreadID == LLThread::currentID());
S32 res = 0;
mProcessing = TRUE;
@@ -910,22 +1162,25 @@ S32 LLCurlRequest::process()
{
curlmulti_set_t::iterator curiter = iter++;
LLCurl::Multi* multi = *curiter;
+
+ if(!multi->isValid())
+ {
+ if(multi == mActiveMulti)
+ {
+ mActiveMulti = NULL ;
+ mActiveRequestCount = 0 ;
+ }
+ mMultiSet.erase(curiter) ;
+ LLCurl::getCurlThread()->killMulti(multi) ;
+ continue ;
+ }
+
S32 tres = multi->process();
res += tres;
if (multi != mActiveMulti && tres == 0 && multi->mQueued == 0)
{
mMultiSet.erase(curiter);
- multi->mQuitting = true;
- if (multi->mThreaded)
- {
- while (!multi->isStopped())
- {
- multi->mSignal->signal();
- apr_sleep(1000);
- }
- }
-
- delete multi;
+ LLCurl::getCurlThread()->killMulti(multi);
}
}
mProcessing = FALSE;
@@ -934,15 +1189,27 @@ S32 LLCurlRequest::process()
S32 LLCurlRequest::getQueued()
{
- llassert_always(mThreadID == LLThread::currentID());
S32 queued = 0;
for (curlmulti_set_t::iterator iter = mMultiSet.begin();
iter != mMultiSet.end(); )
{
curlmulti_set_t::iterator curiter = iter++;
LLCurl::Multi* multi = *curiter;
+
+ if(!multi->isValid())
+ {
+ if(multi == mActiveMulti)
+ {
+ mActiveMulti = NULL ;
+ mActiveRequestCount = 0 ;
+ }
+ LLCurl::getCurlThread()->killMulti(multi);
+ mMultiSet.erase(curiter) ;
+ continue ;
+ }
+
queued += multi->mQueued;
- if (multi->mPerformState != LLCurl::Multi::PERFORM_STATE_READY)
+ if (multi->getState() != LLCurl::Multi::STATE_READY)
{
++queued;
}
@@ -959,10 +1226,9 @@ LLCurlEasyRequest::LLCurlEasyRequest()
mResultReturned(false)
{
mMulti = new LLCurl::Multi();
- if (mMulti->mThreaded)
+
+ if(mMulti->isValid())
{
- mMulti->start();
- }
mEasy = mMulti->allocEasy();
if (mEasy)
{
@@ -972,24 +1238,22 @@ LLCurlEasyRequest::LLCurlEasyRequest()
LLProxy::getInstance()->applyProxySettings(mEasy);
}
}
+ else
+ {
+ LLCurl::getCurlThread()->killMulti(mMulti) ;
+ mEasy = NULL ;
+ mMulti = NULL ;
+ }
+}
LLCurlEasyRequest::~LLCurlEasyRequest()
{
- mMulti->mQuitting = true;
- if (mMulti->mThreaded)
- {
- while (!mMulti->isStopped())
- {
- mMulti->mSignal->signal();
- apr_sleep(1000);
- }
- }
- delete mMulti;
+ LLCurl::getCurlThread()->killMulti(mMulti) ;
}
void LLCurlEasyRequest::setopt(CURLoption option, S32 value)
{
- if (mEasy)
+ if (isValid() && mEasy)
{
mEasy->setopt(option, value);
}
@@ -997,7 +1261,7 @@ void LLCurlEasyRequest::setopt(CURLoption option, S32 value)
void LLCurlEasyRequest::setoptString(CURLoption option, const std::string& value)
{
- if (mEasy)
+ if (isValid() && mEasy)
{
mEasy->setoptString(option, value);
}
@@ -1005,7 +1269,7 @@ void LLCurlEasyRequest::setoptString(CURLoption option, const std::string& value
void LLCurlEasyRequest::setPost(char* postdata, S32 size)
{
- if (mEasy)
+ if (isValid() && mEasy)
{
mEasy->setopt(CURLOPT_POST, 1);
mEasy->setopt(CURLOPT_POSTFIELDS, postdata);
@@ -1015,7 +1279,7 @@ void LLCurlEasyRequest::setPost(char* postdata, S32 size)
void LLCurlEasyRequest::setHeaderCallback(curl_header_callback callback, void* userdata)
{
- if (mEasy)
+ if (isValid() && mEasy)
{
mEasy->setopt(CURLOPT_HEADERFUNCTION, (void*)callback);
mEasy->setopt(CURLOPT_HEADERDATA, userdata); // aka CURLOPT_WRITEHEADER
@@ -1024,7 +1288,7 @@ void LLCurlEasyRequest::setHeaderCallback(curl_header_callback callback, void* u
void LLCurlEasyRequest::setWriteCallback(curl_write_callback callback, void* userdata)
{
- if (mEasy)
+ if (isValid() && mEasy)
{
mEasy->setopt(CURLOPT_WRITEFUNCTION, (void*)callback);
mEasy->setopt(CURLOPT_WRITEDATA, userdata);
@@ -1033,7 +1297,7 @@ void LLCurlEasyRequest::setWriteCallback(curl_write_callback callback, void* use
void LLCurlEasyRequest::setReadCallback(curl_read_callback callback, void* userdata)
{
- if (mEasy)
+ if (isValid() && mEasy)
{
mEasy->setopt(CURLOPT_READFUNCTION, (void*)callback);
mEasy->setopt(CURLOPT_READDATA, userdata);
@@ -1042,7 +1306,7 @@ void LLCurlEasyRequest::setReadCallback(curl_read_callback callback, void* userd
void LLCurlEasyRequest::setSSLCtxCallback(curl_ssl_ctx_callback callback, void* userdata)
{
- if (mEasy)
+ if (isValid() && mEasy)
{
mEasy->setopt(CURLOPT_SSL_CTX_FUNCTION, (void*)callback);
mEasy->setopt(CURLOPT_SSL_CTX_DATA, userdata);
@@ -1051,7 +1315,7 @@ void LLCurlEasyRequest::setSSLCtxCallback(curl_ssl_ctx_callback callback, void*
void LLCurlEasyRequest::slist_append(const char* str)
{
- if (mEasy)
+ if (isValid() && mEasy)
{
mEasy->slist_append(str);
}
@@ -1062,7 +1326,7 @@ void LLCurlEasyRequest::sendRequest(const std::string& url)
llassert_always(!mRequestSent);
mRequestSent = true;
lldebugs << url << llendl;
- if (mEasy)
+ if (isValid() && mEasy)
{
mEasy->setHeaders();
mEasy->setoptString(CURLOPT_URL, url);
@@ -1074,25 +1338,24 @@ void LLCurlEasyRequest::requestComplete()
{
llassert_always(mRequestSent);
mRequestSent = false;
- if (mEasy)
+ if (isValid() && mEasy)
{
mMulti->removeEasy(mEasy);
}
}
-void LLCurlEasyRequest::perform()
-{
- mMulti->perform();
-}
-
// Usage: Call getRestult until it returns false (no more messages)
bool LLCurlEasyRequest::getResult(CURLcode* result, LLCurl::TransferInfo* info)
{
- if (mMulti->mPerformState != LLCurl::Multi::PERFORM_STATE_COMPLETED)
+ if(!isValid())
+ {
+ return false ;
+ }
+ if (!mMulti->isCompleted())
{ //we're busy, try again later
return false;
}
- mMulti->mPerformState = LLCurl::Multi::PERFORM_STATE_READY;
+ mMulti->setState(LLCurl::Multi::STATE_READY) ;
if (!mEasy)
{
@@ -1152,7 +1415,7 @@ CURLMsg* LLCurlEasyRequest::info_read(S32* q, LLCurl::TransferInfo* info)
std::string LLCurlEasyRequest::getErrorString()
{
- return mEasy ? std::string(mEasy->getErrorBuffer()) : std::string();
+ return isValid() && mEasy ? std::string(mEasy->getErrorBuffer()) : std::string();
}
////////////////////////////////////////////////////////////////////////////
@@ -1178,10 +1441,11 @@ unsigned long LLCurl::ssl_thread_id(void)
}
#endif
-void LLCurl::initClass(bool multi_threaded)
+void LLCurl::initClass(F32 curl_reuest_timeout, S32 max_number_handles, bool multi_threaded)
{
- sMainThreadID = LLThread::currentID();
- sMultiThreaded = multi_threaded;
+ sCurlRequestTimeOut = curl_reuest_timeout ; //seconds
+ sMaxHandles = max_number_handles ; //max number of handles, (multi handles and easy handles combined).
+
// Do not change this "unless you are familiar with and mean to control
// internal operations of libcurl"
// - http://curl.haxx.se/libcurl/c/curl_global_init.html
@@ -1189,9 +1453,6 @@ void LLCurl::initClass(bool multi_threaded)
check_curl_code(code);
- Easy::sHandleMutex = new LLMutex(NULL);
- Easy::sMultiMutex = new LLMutex(NULL);
-
#if SAFE_SSL
S32 mutex_count = CRYPTO_num_locks();
for (S32 i=0; i<mutex_count; i++)
@@ -1201,31 +1462,118 @@ void LLCurl::initClass(bool multi_threaded)
CRYPTO_set_id_callback(&LLCurl::ssl_thread_id);
CRYPTO_set_locking_callback(&LLCurl::ssl_locking_callback);
#endif
+
+ sCurlThread = new LLCurlThread(multi_threaded) ;
+ if(multi_threaded)
+ {
+ sHandleMutexp = new LLMutex(NULL) ;
+ Easy::sHandleMutexp = new LLMutex(NULL) ;
+ }
}
void LLCurl::cleanupClass()
{
+ sNotQuitting = false; //set quitting
+
+ //shut down curl thread
+ while(1)
+ {
+ if(!sCurlThread->update(1)) //finish all tasks
+ {
+ break ;
+ }
+ }
+ sCurlThread->shutdown() ;
+ delete sCurlThread ;
+ sCurlThread = NULL ;
+
#if SAFE_SSL
CRYPTO_set_locking_callback(NULL);
for_each(sSSLMutex.begin(), sSSLMutex.end(), DeletePointer());
#endif
- delete Easy::sHandleMutex;
- Easy::sHandleMutex = NULL;
- delete Easy::sMultiMutex;
- Easy::sMultiMutex = NULL;
-
for (std::set<CURL*>::iterator iter = Easy::sFreeHandles.begin(); iter != Easy::sFreeHandles.end(); ++iter)
{
CURL* curl = *iter;
- curl_easy_cleanup(curl);
+ LLCurl::deleteEasyHandle(curl);
}
Easy::sFreeHandles.clear();
+ delete Easy::sHandleMutexp ;
+ Easy::sHandleMutexp = NULL ;
+
+ delete sHandleMutexp ;
+ sHandleMutexp = NULL ;
+
llassert(Easy::sActiveHandles.empty());
}
+//static
+CURLM* LLCurl::newMultiHandle()
+{
+ LLMutexLock lock(sHandleMutexp) ;
+
+ if(sTotalHandles + 1 > sMaxHandles)
+ {
+ llwarns << "no more handles available." << llendl ;
+ return NULL ; //failed
+ }
+ sTotalHandles++;
+
+ CURLM* ret = curl_multi_init() ;
+ if(!ret)
+ {
+ llwarns << "curl_multi_init failed." << llendl ;
+ }
+
+ return ret ;
+}
+
+//static
+CURLMcode LLCurl::deleteMultiHandle(CURLM* handle)
+{
+ if(handle)
+ {
+ LLMutexLock lock(sHandleMutexp) ;
+ sTotalHandles-- ;
+ return curl_multi_cleanup(handle) ;
+ }
+ return CURLM_OK ;
+}
+
+//static
+CURL* LLCurl::newEasyHandle()
+{
+ LLMutexLock lock(sHandleMutexp) ;
+
+ if(sTotalHandles + 1 > sMaxHandles)
+ {
+ llwarns << "no more handles available." << llendl ;
+ return NULL ; //failed
+ }
+ sTotalHandles++;
+
+ CURL* ret = curl_easy_init() ;
+ if(!ret)
+ {
+ llwarns << "curl_easy_init failed." << llendl ;
+ }
+
+ return ret ;
+}
+
+//static
+void LLCurl::deleteEasyHandle(CURL* handle)
+{
+ if(handle)
+ {
+ LLMutexLock lock(sHandleMutexp) ;
+ curl_easy_cleanup(handle) ;
+ sTotalHandles-- ;
+ }
+}
+
const unsigned int LLCurl::MAX_REDIRECTS = 5;
// Provide access to LLCurl free functions outside of llcurl.cpp without polluting the global namespace.
diff --git a/indra/llmessage/llcurl.h b/indra/llmessage/llcurl.h
index 87de202717..fd664c0fa1 100755..100644
--- a/indra/llmessage/llcurl.h
+++ b/indra/llmessage/llcurl.h
@@ -42,8 +42,11 @@
#include "lliopipe.h"
#include "llsd.h"
#include "llthread.h"
+#include "llqueuedthread.h"
+#include "llframetimer.h"
class LLMutex;
+class LLCurlThread;
// For whatever reason, this is not typedef'd in curl.h
typedef size_t (*curl_header_callback)(void *ptr, size_t size, size_t nmemb, void *stream);
@@ -56,8 +59,6 @@ public:
class Easy;
class Multi;
- static bool sMultiThreaded;
-
struct TransferInfo
{
TransferInfo() : mSizeDownload(0.0), mTotalTime(0.0), mSpeedDownload(0.0) {}
@@ -162,7 +163,7 @@ public:
/**
* @ brief Initialize LLCurl class
*/
- static void initClass(bool multi_threaded = false);
+ static void initClass(F32 curl_reuest_timeout = 120.f, S32 max_number_handles = 256, bool multi_threaded = false);
/**
* @ brief Cleanup LLCurl class
@@ -181,10 +182,25 @@ public:
static void ssl_locking_callback(int mode, int type, const char *file, int line);
static unsigned long ssl_thread_id(void);
+ static LLCurlThread* getCurlThread() { return sCurlThread ;}
+
+ static CURLM* newMultiHandle() ;
+ static CURLMcode deleteMultiHandle(CURLM* handle) ;
+ static CURL* newEasyHandle() ;
+ static void deleteEasyHandle(CURL* handle) ;
+
private:
static std::string sCAPath;
static std::string sCAFile;
static const unsigned int MAX_REDIRECTS;
+ static LLCurlThread* sCurlThread;
+
+ static LLMutex* sHandleMutexp ;
+ static S32 sTotalHandles ;
+ static S32 sMaxHandles;
+public:
+ static bool sNotQuitting;
+ static F32 sCurlRequestTimeOut;
};
class LLCurl::Easy
@@ -216,7 +232,7 @@ public:
U32 report(CURLcode);
void getTransferInfo(LLCurl::TransferInfo* info);
- void prepRequest(const std::string& url, const std::vector<std::string>& headers, ResponderPtr, S32 time_out = 0, bool post = false);
+ void prepRequest(const std::string& url, const std::vector<std::string>& headers, LLCurl::ResponderPtr, S32 time_out = 0, bool post = false);
const char* getErrorBuffer();
@@ -247,64 +263,120 @@ private:
// Note: char*'s not strings since we pass pointers to curl
std::vector<char*> mStrings;
- ResponderPtr mResponder;
+ LLCurl::ResponderPtr mResponder;
static std::set<CURL*> sFreeHandles;
static std::set<CURL*> sActiveHandles;
- static LLMutex* sHandleMutex;
- static LLMutex* sMultiMutex;
+ static LLMutex* sHandleMutexp ;
};
-class LLCurl::Multi : public LLThread
+class LLCurl::Multi
{
LOG_CLASS(Multi);
+
+ friend class LLCurlThread ;
+
+private:
+ ~Multi();
+
+ void markDead() ;
+ bool doPerform();
+
public:
typedef enum
{
- PERFORM_STATE_READY=0,
- PERFORM_STATE_PERFORMING=1,
- PERFORM_STATE_COMPLETED=2
+ STATE_READY=0,
+ STATE_PERFORMING=1,
+ STATE_COMPLETED=2
} ePerformState;
- Multi();
- ~Multi();
+ Multi(F32 idle_time_out = 0.f);
- Easy* allocEasy();
- bool addEasy(Easy* easy);
+ LLCurl::Easy* allocEasy();
+ bool addEasy(LLCurl::Easy* easy);
+ void removeEasy(LLCurl::Easy* easy);
- void removeEasy(Easy* easy);
+ void lock() ;
+ void unlock() ;
- S32 process();
- void perform();
- void doPerform();
+ void setState(ePerformState state) ;
+ ePerformState getState() ;
- virtual void run();
+ bool isCompleted() ;
+ bool isValid() {return mCurlMultiHandle != NULL ;}
+ bool isDead() {return mDead;}
+
+ bool waitToComplete() ;
+ S32 process();
+
CURLMsg* info_read(S32* msgs_in_queue);
S32 mQueued;
S32 mErrorCount;
- S32 mPerformState;
-
- LLCondition* mSignal;
- bool mQuitting;
- bool mThreaded;
-
private:
- void easyFree(Easy*);
+ void easyFree(LLCurl::Easy*);
+ void cleanup() ;
CURLM* mCurlMultiHandle;
- typedef std::set<Easy*> easy_active_list_t;
+ typedef std::set<LLCurl::Easy*> easy_active_list_t;
easy_active_list_t mEasyActiveList;
- typedef std::map<CURL*, Easy*> easy_active_map_t;
+ typedef std::map<CURL*, LLCurl::Easy*> easy_active_map_t;
easy_active_map_t mEasyActiveMap;
- typedef std::set<Easy*> easy_free_list_t;
+ typedef std::set<LLCurl::Easy*> easy_free_list_t;
easy_free_list_t mEasyFreeList;
+
+ LLQueuedThread::handle_t mHandle ;
+ ePerformState mState;
+
+ BOOL mDead ;
+ LLMutex* mMutexp ;
+ LLMutex* mDeletionMutexp ;
+ LLMutex* mEasyMutexp ;
+ LLFrameTimer mIdleTimer ;
+ F32 mIdleTimeOut;
};
+class LLCurlThread : public LLQueuedThread
+{
+public:
+
+ class CurlRequest : public LLQueuedThread::QueuedRequest
+ {
+ protected:
+ virtual ~CurlRequest(); // use deleteRequest()
+
+ public:
+ CurlRequest(handle_t handle, LLCurl::Multi* multi, LLCurlThread* curl_thread);
+
+ /*virtual*/ bool processRequest();
+ /*virtual*/ void finishRequest(bool completed);
+
+ private:
+ // input
+ LLCurl::Multi* mMulti;
+ LLCurlThread* mCurlThread;
+ };
+ friend class CurlRequest;
+
+public:
+ LLCurlThread(bool threaded = true) ;
+ virtual ~LLCurlThread() ;
+
+ S32 update(F32 max_time_ms);
+
+ void addMulti(LLCurl::Multi* multi) ;
+ void killMulti(LLCurl::Multi* multi) ;
+
+private:
+ bool doMultiPerform(LLCurl::Multi* multi) ;
+ void deleteMulti(LLCurl::Multi* multi) ;
+ void cleanupMulti(LLCurl::Multi* multi) ;
+} ;
+
namespace boost
{
void intrusive_ptr_add_ref(LLCurl::Responder* p);
@@ -339,7 +411,6 @@ private:
LLCurl::Multi* mActiveMulti;
S32 mActiveRequestCount;
BOOL mProcessing;
- U32 mThreadID; // debug
};
class LLCurlEasyRequest
@@ -357,9 +428,11 @@ public:
void slist_append(const char* str);
void sendRequest(const std::string& url);
void requestComplete();
- void perform();
bool getResult(CURLcode* result, LLCurl::TransferInfo* info = NULL);
std::string getErrorString();
+ bool isCompleted() {return mMulti->isCompleted() ;}
+ bool wait() { return mMulti->waitToComplete(); }
+ bool isValid() {return mMulti && mMulti->isValid(); }
LLCurl::Easy* getEasy() const { return mEasy; }
diff --git a/indra/llmessage/llhttpassetstorage.cpp b/indra/llmessage/llhttpassetstorage.cpp
index 2bca517e97..612d765969 100644
--- a/indra/llmessage/llhttpassetstorage.cpp
+++ b/indra/llmessage/llhttpassetstorage.cpp
@@ -232,7 +232,8 @@ LLSD LLHTTPAssetRequest::getFullDetails() const
void LLHTTPAssetRequest::setupCurlHandle()
{
// *NOTE: Similar code exists in mapserver/llcurlutil.cpp JC
- mCurlHandle = curl_easy_init();
+ mCurlHandle = LLCurl::newEasyHandle();
+ llassert_always(mCurlHandle != NULL) ;
// Apply proxy settings if configured to do so
LLProxy::getInstance()->applyProxySettings(mCurlHandle);
@@ -278,7 +279,7 @@ void LLHTTPAssetRequest::setupCurlHandle()
void LLHTTPAssetRequest::cleanupCurlHandle()
{
- curl_easy_cleanup(mCurlHandle);
+ LLCurl::deleteEasyHandle(mCurlHandle);
if (mAssetStoragep)
{
// Terminating a request. Thus upload or download is no longer pending.
@@ -429,12 +430,13 @@ void LLHTTPAssetStorage::_init(const std::string& web_host, const std::string& l
// curl_global_init moved to LLCurl::initClass()
- mCurlMultiHandle = curl_multi_init();
+ mCurlMultiHandle = LLCurl::newMultiHandle() ;
+ llassert_always(mCurlMultiHandle != NULL) ;
}
LLHTTPAssetStorage::~LLHTTPAssetStorage()
{
- curl_multi_cleanup(mCurlMultiHandle);
+ LLCurl::deleteMultiHandle(mCurlMultiHandle);
mCurlMultiHandle = NULL;
// curl_global_cleanup moved to LLCurl::initClass()
diff --git a/indra/llmessage/llhttpclient.cpp b/indra/llmessage/llhttpclient.cpp
index dd4e3a6300..0c325a68aa 100644
--- a/indra/llmessage/llhttpclient.cpp
+++ b/indra/llmessage/llhttpclient.cpp
@@ -158,7 +158,7 @@ namespace
if(fstream.is_open())
{
fstream.seekg(0, std::ios::end);
- U32 fileSize = fstream.tellg();
+ U32 fileSize = (U32)fstream.tellg();
fstream.seekg(0, std::ios::beg);
std::vector<char> fileBuffer(fileSize);
fstream.read(&fileBuffer[0], fileSize);
@@ -228,6 +228,12 @@ static void request(
LLPumpIO::chain_t chain;
LLURLRequest* req = new LLURLRequest(method, url);
+ if(!req->isValid())//failed
+ {
+ delete req ;
+ return ;
+ }
+
req->setSSLVerifyCallback(LLHTTPClient::getCertVerifyCallback(), (void *)req);
@@ -423,7 +429,9 @@ static LLSD blocking_request(
{
lldebugs << "blockingRequest of " << url << llendl;
char curl_error_buffer[CURL_ERROR_SIZE] = "\0";
- CURL* curlp = curl_easy_init();
+ CURL* curlp = LLCurl::newEasyHandle();
+ llassert_always(curlp != NULL) ;
+
LLHTTPBuffer http_buffer;
std::string body_str;
@@ -517,7 +525,7 @@ static LLSD blocking_request(
}
// * Cleanup
- curl_easy_cleanup(curlp);
+ LLCurl::deleteEasyHandle(curlp);
return response;
}
diff --git a/indra/llmessage/lliohttpserver.cpp b/indra/llmessage/lliohttpserver.cpp
index 73e8a69085..987f386aa3 100644
--- a/indra/llmessage/lliohttpserver.cpp
+++ b/indra/llmessage/lliohttpserver.cpp
@@ -818,6 +818,8 @@ LLIOPipe::EStatus LLHTTPResponder::process_impl(
// Copy everything after mLast read to the out.
LLBufferArray::segment_iterator_t seg_iter;
+
+ buffer->lock();
seg_iter = buffer->splitAfter(mLastRead);
if(seg_iter != buffer->endSegment())
{
@@ -838,7 +840,7 @@ LLIOPipe::EStatus LLHTTPResponder::process_impl(
}
#endif
}
-
+ buffer->unlock();
//
// *FIX: get rid of extra bytes off the end
//
diff --git a/indra/llmessage/lliopipe.cpp b/indra/llmessage/lliopipe.cpp
index 6e4eec74a6..8f827f7a30 100644
--- a/indra/llmessage/lliopipe.cpp
+++ b/indra/llmessage/lliopipe.cpp
@@ -75,6 +75,12 @@ LLIOPipe::~LLIOPipe()
//lldebugs << "destroying LLIOPipe" << llendl;
}
+//virtual
+bool LLIOPipe::isValid()
+{
+ return true ;
+}
+
// static
std::string LLIOPipe::lookupStatusString(EStatus status)
{
diff --git a/indra/llmessage/lliopipe.h b/indra/llmessage/lliopipe.h
index 8e656b6da1..cbd17b5a3d 100644
--- a/indra/llmessage/lliopipe.h
+++ b/indra/llmessage/lliopipe.h
@@ -231,6 +231,8 @@ public:
*/
virtual ~LLIOPipe();
+ virtual bool isValid() ;
+
protected:
/**
* @brief Base Constructor.
diff --git a/indra/llmessage/lliosocket.cpp b/indra/llmessage/lliosocket.cpp
index 54ceab3422..d5b4d45821 100644
--- a/indra/llmessage/lliosocket.cpp
+++ b/indra/llmessage/lliosocket.cpp
@@ -445,6 +445,7 @@ LLIOPipe::EStatus LLIOSocketWriter::process_impl(
// efficient - not only because writev() is better, but also
// because we won't have to do as much work to find the start
// address.
+ buffer->lock();
LLBufferArray::segment_iterator_t it;
LLBufferArray::segment_iterator_t end = buffer->endSegment();
LLSegment segment;
@@ -524,6 +525,8 @@ LLIOPipe::EStatus LLIOSocketWriter::process_impl(
}
}
+ buffer->unlock();
+
PUMP_DEBUG;
if(done && eos)
{
diff --git a/indra/llmessage/llmime.cpp b/indra/llmessage/llmime.cpp
index 943a734927..9d9c4ebd68 100644
--- a/indra/llmessage/llmime.cpp
+++ b/indra/llmessage/llmime.cpp
@@ -388,7 +388,7 @@ bool LLMimeParser::Impl::parseHeaders(
// not to read past limit when we get() the newline.
S32 max_get = llmin((S32)LINE_BUFFER_LENGTH, limit - mScanCount - 1);
istr.getline(mBuffer, max_get, '\r');
- mScanCount += istr.gcount();
+ mScanCount += (S32)istr.gcount();
int c = istr.get();
if(EOF == c)
{
@@ -496,7 +496,7 @@ void LLMimeParser::Impl::scanPastSeparator(
// past limit when we get() the newline.
S32 max_get = llmin((S32)LINE_BUFFER_LENGTH, limit - mScanCount - 1);
istr.getline(mBuffer, max_get, '\r');
- mScanCount += istr.gcount();
+ mScanCount += (S32)istr.gcount();
if(istr.gcount() >= LINE_BUFFER_LENGTH - 1)
{
// that's way too long to be a separator, so ignore it.
diff --git a/indra/llmessage/llpumpio.cpp b/indra/llmessage/llpumpio.cpp
index a8d2a0a224..f3ef4f2684 100644
--- a/indra/llmessage/llpumpio.cpp
+++ b/indra/llmessage/llpumpio.cpp
@@ -195,7 +195,7 @@ bool LLPumpIO::prime(apr_pool_t* pool)
return ((pool == NULL) ? false : true);
}
-bool LLPumpIO::addChain(const chain_t& chain, F32 timeout)
+bool LLPumpIO::addChain(const chain_t& chain, F32 timeout, bool has_curl_request)
{
LLMemType m1(LLMemType::MTYPE_IO_PUMP);
if(chain.empty()) return false;
@@ -204,8 +204,10 @@ bool LLPumpIO::addChain(const chain_t& chain, F32 timeout)
LLScopedLock lock(mChainsMutex);
#endif
LLChainInfo info;
+ info.mHasCurlRequest = has_curl_request;
info.setTimeoutSeconds(timeout);
info.mData = LLIOPipe::buffer_ptr_t(new LLBufferArray);
+ info.mData->setThreaded(has_curl_request);
LLLinkInfo link;
#if LL_DEBUG_PIPE_TYPE_IN_PUMP
lldebugs << "LLPumpIO::addChain() " << chain[0] << " '"
@@ -440,6 +442,15 @@ void LLPumpIO::pump()
static LLFastTimer::DeclareTimer FTM_PUMP_IO("Pump IO");
+LLPumpIO::current_chain_t LLPumpIO::removeRunningChain(LLPumpIO::current_chain_t& run_chain)
+{
+ std::for_each(
+ (*run_chain).mDescriptors.begin(),
+ (*run_chain).mDescriptors.end(),
+ ll_delete_apr_pollset_fd_client_data());
+ return mRunningChains.erase(run_chain);
+}
+
//timeout is in microseconds
void LLPumpIO::pump(const S32& poll_timeout)
{
@@ -585,10 +596,16 @@ void LLPumpIO::pump(const S32& poll_timeout)
// << (*run_chain).mChainLinks[0].mPipe
// << " because we reached the end." << llendl;
#endif
- run_chain = mRunningChains.erase(run_chain);
+ run_chain = removeRunningChain(run_chain);
continue;
}
}
+ else if(isChainExpired(*run_chain))
+ {
+ run_chain = removeRunningChain(run_chain);
+ continue;
+ }
+
PUMP_DEBUG;
if((*run_chain).mLock)
{
@@ -696,11 +713,7 @@ void LLPumpIO::pump(const S32& poll_timeout)
PUMP_DEBUG;
// This chain is done. Clean up any allocated memory and
// erase the chain info.
- std::for_each(
- (*run_chain).mDescriptors.begin(),
- (*run_chain).mDescriptors.end(),
- ll_delete_apr_pollset_fd_client_data());
- run_chain = mRunningChains.erase(run_chain);
+ run_chain = removeRunningChain(run_chain);
// *NOTE: may not always need to rebuild the pollset.
mRebuildPollset = true;
@@ -1095,6 +1108,24 @@ void LLPumpIO::processChain(LLChainInfo& chain)
PUMP_DEBUG;
}
+bool LLPumpIO::isChainExpired(LLChainInfo& chain)
+{
+ if(!chain.mHasCurlRequest)
+ {
+ return false ;
+ }
+
+ for(links_t::iterator iter = chain.mChainLinks.begin(); iter != chain.mChainLinks.end(); ++iter)
+ {
+ if(!(*iter).mPipe->isValid())
+ {
+ return true ;
+ }
+ }
+
+ return false ;
+}
+
bool LLPumpIO::handleChainError(
LLChainInfo& chain,
LLIOPipe::EStatus error)
@@ -1136,6 +1167,9 @@ bool LLPumpIO::handleChainError(
#endif
keep_going = false;
break;
+ case LLIOPipe::STATUS_EXPIRED:
+ keep_going = false;
+ break ;
default:
if(LLIOPipe::isSuccess(error))
{
@@ -1157,7 +1191,8 @@ bool LLPumpIO::handleChainError(
LLPumpIO::LLChainInfo::LLChainInfo() :
mInit(false),
mLock(0),
- mEOS(false)
+ mEOS(false),
+ mHasCurlRequest(false)
{
LLMemType m1(LLMemType::MTYPE_IO_PUMP);
mTimer.setTimerExpirySec(DEFAULT_CHAIN_EXPIRY_SECS);
diff --git a/indra/llmessage/llpumpio.h b/indra/llmessage/llpumpio.h
index 9303c9d7fc..d2c5d37571 100644
--- a/indra/llmessage/llpumpio.h
+++ b/indra/llmessage/llpumpio.h
@@ -111,9 +111,10 @@ public:
* @param chain The pipes for the chain
* @param timeout The number of seconds in the future to
* expire. Pass in 0.0f to never expire.
+ * @param has_curl_request The chain contains LLURLRequest if true.
* @return Returns true if anything was added to the pump.
*/
- bool addChain(const chain_t& chain, F32 timeout);
+ bool addChain(const chain_t& chain, F32 timeout, bool has_curl_request = false);
/**
* @brief Struct to associate a pipe with it's buffer io indexes.
@@ -356,12 +357,13 @@ protected:
// basic member data
bool mInit;
+ bool mEOS;
+ bool mHasCurlRequest;
S32 mLock;
LLFrameTimer mTimer;
links_t::iterator mHead;
links_t mChainLinks;
- LLIOPipe::buffer_ptr_t mData;
- bool mEOS;
+ LLIOPipe::buffer_ptr_t mData;
LLSD mContext;
// tracking inside the pump
@@ -402,7 +404,7 @@ protected:
protected:
void initialize(apr_pool_t* pool);
void cleanup();
-
+ current_chain_t removeRunningChain(current_chain_t& chain) ;
/**
* @brief Given the internal state of the chains, rebuild the pollset
* @see setConditional()
@@ -429,6 +431,9 @@ protected:
*/
bool handleChainError(LLChainInfo& chain, LLIOPipe::EStatus error);
+ //if the chain is expired, remove it
+ bool isChainExpired(LLChainInfo& chain) ;
+
public:
/**
* @brief Return number of running chains.
diff --git a/indra/llmessage/llsdmessage.cpp b/indra/llmessage/llsdmessage.cpp
index 9148c9dd15..1c93c12d99 100644
--- a/indra/llmessage/llsdmessage.cpp
+++ b/indra/llmessage/llsdmessage.cpp
@@ -88,7 +88,7 @@ bool LLSDMessage::httpListener(const LLSD& request)
request,
url, "POST", reply, error),
LLSD(), // headers
- timeout);
+ (F32)timeout);
return false;
}
diff --git a/indra/llmessage/llsdmessagebuilder.cpp b/indra/llmessage/llsdmessagebuilder.cpp
index 2698a271ee..615221e0ad 100644
--- a/indra/llmessage/llsdmessagebuilder.cpp
+++ b/indra/llmessage/llsdmessagebuilder.cpp
@@ -317,7 +317,7 @@ void LLSDMessageBuilder::copyFromMessageData(const LLMsgData& data)
// S64 not supported in LLSD so we just truncate it
case MVT_S64:
- addS32(varname, *(S64*)mvci.getData());
+ addS32(varname, (S32)*(S64*)mvci.getData());
break;
case MVT_F32:
diff --git a/indra/llmessage/llsdmessagereader.cpp b/indra/llmessage/llsdmessagereader.cpp
index 304a692cdf..3d8ca2ad9f 100644
--- a/indra/llmessage/llsdmessagereader.cpp
+++ b/indra/llmessage/llsdmessagereader.cpp
@@ -291,9 +291,10 @@ S32 getElementSize(const LLSD& llsd)
case LLSD::TypeMap:
case LLSD::TypeArray:
case LLSD::TypeUndefined:
+ default: // TypeLLSDTypeEnd, TypeLLSDNumTypes, etc.
return 0;
}
- return 0;
+ //return 0;
}
//virtual
diff --git a/indra/llmessage/llsdrpcclient.h b/indra/llmessage/llsdrpcclient.h
index 9fb49a5c33..0cecf4f688 100644
--- a/indra/llmessage/llsdrpcclient.h
+++ b/indra/llmessage/llsdrpcclient.h
@@ -240,9 +240,16 @@ public:
virtual bool build(LLPumpIO::chain_t& chain, LLSD context) const
{
lldebugs << "LLSDRPCClientFactory::build" << llendl;
- LLIOPipe::ptr_t service(new Client);
- chain.push_back(service);
LLURLRequest* http(new LLURLRequest(LLURLRequest::HTTP_POST));
+ if(!http->isValid())
+ {
+ llwarns << "Creating LLURLRequest failed." << llendl ;
+ delete http;
+ return false;
+ }
+
+ LLIOPipe::ptr_t service(new Client);
+ chain.push_back(service);
LLIOPipe::ptr_t http_pipe(http);
http->addHeader("Content-Type: text/llsd");
if(mURL.empty())
@@ -283,9 +290,16 @@ public:
virtual bool build(LLPumpIO::chain_t& chain, LLSD context) const
{
lldebugs << "LLXMLSDRPCClientFactory::build" << llendl;
- LLIOPipe::ptr_t service(new Client);
- chain.push_back(service);
+
LLURLRequest* http(new LLURLRequest(LLURLRequest::HTTP_POST));
+ if(!http->isValid())
+ {
+ llwarns << "Creating LLURLRequest failed." << llendl ;
+ delete http;
+ return false ;
+ }
+ LLIOPipe::ptr_t service(new Client);
+ chain.push_back(service);
LLIOPipe::ptr_t http_pipe(http);
http->addHeader("Content-Type: text/xml");
if(mURL.empty())
diff --git a/indra/llmessage/llurlrequest.cpp b/indra/llmessage/llurlrequest.cpp
index fa03bb7512..a16f5c7bf0 100644
--- a/indra/llmessage/llurlrequest.cpp
+++ b/indra/llmessage/llurlrequest.cpp
@@ -64,7 +64,7 @@ public:
~LLURLRequestDetail();
std::string mURL;
LLCurlEasyRequest* mCurlRequest;
- LLBufferArray* mResponseBuffer;
+ LLIOPipe::buffer_ptr_t mResponseBuffer;
LLChannelDescriptors mChannels;
U8* mLastRead;
U32 mBodyLimit;
@@ -75,7 +75,6 @@ public:
LLURLRequestDetail::LLURLRequestDetail() :
mCurlRequest(NULL),
- mResponseBuffer(NULL),
mLastRead(NULL),
mBodyLimit(0),
mByteAccumulator(0),
@@ -84,13 +83,18 @@ LLURLRequestDetail::LLURLRequestDetail() :
{
LLMemType m1(LLMemType::MTYPE_IO_URL_REQUEST);
mCurlRequest = new LLCurlEasyRequest();
+
+ if(!mCurlRequest->isValid()) //failed.
+ {
+ delete mCurlRequest ;
+ mCurlRequest = NULL ;
+ }
}
LLURLRequestDetail::~LLURLRequestDetail()
{
LLMemType m1(LLMemType::MTYPE_IO_URL_REQUEST);
delete mCurlRequest;
- mResponseBuffer = NULL;
mLastRead = NULL;
}
@@ -170,6 +174,7 @@ LLURLRequest::~LLURLRequest()
{
LLMemType m1(LLMemType::MTYPE_IO_URL_REQUEST);
delete mDetail;
+ mDetail = NULL ;
}
void LLURLRequest::setURL(const std::string& url)
@@ -251,12 +256,24 @@ void LLURLRequest::allowCookies()
mDetail->mCurlRequest->setoptString(CURLOPT_COOKIEFILE, "");
}
+//virtual
+bool LLURLRequest::isValid()
+{
+ return mDetail->mCurlRequest && mDetail->mCurlRequest->isValid();
+}
+
// virtual
LLIOPipe::EStatus LLURLRequest::handleError(
LLIOPipe::EStatus status,
LLPumpIO* pump)
{
LLMemType m1(LLMemType::MTYPE_IO_URL_REQUEST);
+
+ if(!isValid())
+ {
+ return STATUS_EXPIRED ;
+ }
+
if(mCompletionCallback && pump)
{
LLURLRequestComplete* complete = NULL;
@@ -325,7 +342,7 @@ LLIOPipe::EStatus LLURLRequest::process_impl(
// *FIX: bit of a hack, but it should work. The configure and
// callback method expect this information to be ready.
- mDetail->mResponseBuffer = buffer.get();
+ mDetail->mResponseBuffer = buffer;
mDetail->mChannels = channels;
if(!configure())
{
@@ -344,7 +361,10 @@ LLIOPipe::EStatus LLURLRequest::process_impl(
static LLFastTimer::DeclareTimer FTM_URL_PERFORM("Perform");
{
LLFastTimer t(FTM_URL_PERFORM);
- mDetail->mCurlRequest->perform();
+ if(!mDetail->mCurlRequest->wait())
+ {
+ return status ;
+ }
}
while(1)
@@ -439,6 +459,12 @@ void LLURLRequest::initialize()
LLMemType m1(LLMemType::MTYPE_IO_URL_REQUEST);
mState = STATE_INITIALIZED;
mDetail = new LLURLRequestDetail;
+
+ if(!isValid())
+ {
+ return ;
+ }
+
mDetail->mCurlRequest->setopt(CURLOPT_NOSIGNAL, 1);
mDetail->mCurlRequest->setWriteCallback(&downCallback, (void*)this);
mDetail->mCurlRequest->setReadCallback(&upCallback, (void*)this);
diff --git a/indra/llmessage/llurlrequest.h b/indra/llmessage/llurlrequest.h
index ec5c2c1941..44d358d906 100644
--- a/indra/llmessage/llurlrequest.h
+++ b/indra/llmessage/llurlrequest.h
@@ -188,6 +188,8 @@ public:
*/
void allowCookies();
+ /*virtual*/ bool isValid() ;
+
public:
/**
* @brief Give this pipe a chance to handle a generated error
diff --git a/indra/llmessage/llxfer.h b/indra/llmessage/llxfer.h
index 989e8b2cab..f9348eb11f 100644
--- a/indra/llmessage/llxfer.h
+++ b/indra/llmessage/llxfer.h
@@ -29,6 +29,7 @@
#include "message.h"
#include "lltimer.h"
+#include "llextendedstatus.h"
const S32 LL_XFER_LARGE_PAYLOAD = 7680;
diff --git a/indra/llmessage/message.cpp b/indra/llmessage/message.cpp
index d0b0e178b8..6a425cfe98 100644
--- a/indra/llmessage/message.cpp
+++ b/indra/llmessage/message.cpp
@@ -3147,7 +3147,7 @@ bool LLMessageSystem::generateDigestForWindowAndUUIDs(char* digest, const S32 wi
LL_ERRS("Messaging") << "Trying to generate complex digest on a machine without a shared secret!" << llendl;
}
- U32 now = time(NULL);
+ U32 now = (U32)time(NULL);
now /= window;
@@ -3167,7 +3167,7 @@ bool LLMessageSystem::isMatchingDigestForWindowAndUUIDs(const char* digest, cons
}
char our_digest[MD5HEX_STR_SIZE]; /* Flawfinder: ignore */
- U32 now = time(NULL);
+ U32 now = (U32)time(NULL);
now /= window;
@@ -3213,7 +3213,7 @@ bool LLMessageSystem::generateDigestForWindow(char* digest, const S32 window) co
LL_ERRS("Messaging") << "Trying to generate simple digest on a machine without a shared secret!" << llendl;
}
- U32 now = time(NULL);
+ U32 now = (U32)time(NULL);
now /= window;
diff --git a/indra/llmessage/tests/testrunner.py b/indra/llmessage/tests/testrunner.py
index f2c841532a..5b9beb359b 100644
--- a/indra/llmessage/tests/testrunner.py
+++ b/indra/llmessage/tests/testrunner.py
@@ -35,7 +35,7 @@ import re
import errno
import socket
-VERBOSE = os.environ.get("INTEGRATION_TEST_VERBOSE", "1") # default to verbose
+VERBOSE = os.environ.get("INTEGRATION_TEST_VERBOSE", "0") # default to quiet
# Support usage such as INTEGRATION_TEST_VERBOSE=off -- distressing to user if
# that construct actually turns on verbosity...
VERBOSE = not re.match(r"(0|off|false|quiet)$", VERBOSE, re.IGNORECASE)