summaryrefslogtreecommitdiff
path: root/indra/llmessage
diff options
context:
space:
mode:
Diffstat (limited to 'indra/llmessage')
-rw-r--r--indra/llmessage/llbuffer.cpp90
-rw-r--r--indra/llmessage/llbuffer.h25
-rw-r--r--indra/llmessage/llbufferstream.cpp8
-rw-r--r--indra/llmessage/llcurl.cpp426
-rw-r--r--indra/llmessage/llcurl.h30
-rw-r--r--indra/llmessage/llhttpassetstorage.cpp10
-rw-r--r--indra/llmessage/llhttpclient.cpp12
-rw-r--r--indra/llmessage/lliohttpserver.cpp4
-rw-r--r--indra/llmessage/lliopipe.cpp6
-rw-r--r--indra/llmessage/lliopipe.h2
-rw-r--r--indra/llmessage/lliosocket.cpp3
-rw-r--r--indra/llmessage/llpumpio.cpp51
-rw-r--r--indra/llmessage/llpumpio.h13
-rw-r--r--indra/llmessage/llsdrpcclient.h22
-rw-r--r--indra/llmessage/llurlrequest.cpp30
-rw-r--r--indra/llmessage/llurlrequest.h2
-rw-r--r--indra/llmessage/tests/llsdmessage_test.cpp36
17 files changed, 619 insertions, 151 deletions
diff --git a/indra/llmessage/llbuffer.cpp b/indra/llmessage/llbuffer.cpp
index 0316797f00..250cace6e9 100644
--- a/indra/llmessage/llbuffer.cpp
+++ b/indra/llmessage/llbuffer.cpp
@@ -32,6 +32,9 @@
#include "llmath.h"
#include "llmemtype.h"
#include "llstl.h"
+#include "llthread.h"
+
+#define ASSERT_LLBUFFERARRAY_MUTEX_LOCKED llassert(!mMutexp || mMutexp->isSelfLocked());
/**
* LLSegment
@@ -224,7 +227,8 @@ void LLHeapBuffer::allocate(S32 size)
* LLBufferArray
*/
LLBufferArray::LLBufferArray() :
- mNextBaseChannel(0)
+ mNextBaseChannel(0),
+ mMutexp(NULL)
{
LLMemType m1(LLMemType::MTYPE_IO_BUFFER);
}
@@ -233,6 +237,8 @@ LLBufferArray::~LLBufferArray()
{
LLMemType m1(LLMemType::MTYPE_IO_BUFFER);
std::for_each(mBuffers.begin(), mBuffers.end(), DeletePointer());
+
+ delete mMutexp;
}
// static
@@ -243,14 +249,57 @@ LLChannelDescriptors LLBufferArray::makeChannelConsumer(
return rv;
}
+void LLBufferArray::lock()
+{
+ if(mMutexp)
+ {
+ mMutexp->lock() ;
+ }
+}
+
+void LLBufferArray::unlock()
+{
+ if(mMutexp)
+ {
+ mMutexp->unlock() ;
+ }
+}
+
+LLMutex* LLBufferArray::getMutex()
+{
+ return mMutexp ;
+}
+
+void LLBufferArray::setThreaded(bool threaded)
+{
+ if(threaded)
+ {
+ if(!mMutexp)
+ {
+ mMutexp = new LLMutex(NULL);
+ }
+ }
+ else
+ {
+ if(mMutexp)
+ {
+ delete mMutexp ;
+ mMutexp = NULL ;
+ }
+ }
+}
+
LLChannelDescriptors LLBufferArray::nextChannel()
{
LLChannelDescriptors rv(mNextBaseChannel++);
return rv;
}
+//mMutexp should be locked before calling this.
S32 LLBufferArray::capacity() const
{
+ ASSERT_LLBUFFERARRAY_MUTEX_LOCKED
+
S32 total = 0;
const_buffer_iterator_t iter = mBuffers.begin();
const_buffer_iterator_t end = mBuffers.end();
@@ -263,6 +312,8 @@ S32 LLBufferArray::capacity() const
bool LLBufferArray::append(S32 channel, const U8* src, S32 len)
{
+ LLMutexLock lock(mMutexp) ;
+
LLMemType m1(LLMemType::MTYPE_IO_BUFFER);
std::vector<LLSegment> segments;
if(copyIntoBuffers(channel, src, len, segments))
@@ -273,8 +324,11 @@ bool LLBufferArray::append(S32 channel, const U8* src, S32 len)
return false;
}
+//mMutexp should be locked before calling this.
bool LLBufferArray::prepend(S32 channel, const U8* src, S32 len)
{
+ ASSERT_LLBUFFERARRAY_MUTEX_LOCKED
+
LLMemType m1(LLMemType::MTYPE_IO_BUFFER);
std::vector<LLSegment> segments;
if(copyIntoBuffers(channel, src, len, segments))
@@ -293,6 +347,8 @@ bool LLBufferArray::insertAfter(
{
LLMemType m1(LLMemType::MTYPE_IO_BUFFER);
std::vector<LLSegment> segments;
+
+ LLMutexLock lock(mMutexp) ;
if(mSegments.end() != segment)
{
++segment;
@@ -305,8 +361,11 @@ bool LLBufferArray::insertAfter(
return false;
}
+//mMutexp should be locked before calling this.
LLBufferArray::segment_iterator_t LLBufferArray::splitAfter(U8* address)
{
+ ASSERT_LLBUFFERARRAY_MUTEX_LOCKED
+
LLMemType m1(LLMemType::MTYPE_IO_BUFFER);
segment_iterator_t end = mSegments.end();
segment_iterator_t it = getSegment(address);
@@ -335,20 +394,26 @@ LLBufferArray::segment_iterator_t LLBufferArray::splitAfter(U8* address)
return rv;
}
+//mMutexp should be locked before calling this.
LLBufferArray::segment_iterator_t LLBufferArray::beginSegment()
{
+ ASSERT_LLBUFFERARRAY_MUTEX_LOCKED
return mSegments.begin();
}
+//mMutexp should be locked before calling this.
LLBufferArray::segment_iterator_t LLBufferArray::endSegment()
{
+ ASSERT_LLBUFFERARRAY_MUTEX_LOCKED
return mSegments.end();
}
+//mMutexp should be locked before calling this.
LLBufferArray::segment_iterator_t LLBufferArray::constructSegmentAfter(
U8* address,
LLSegment& segment)
{
+ ASSERT_LLBUFFERARRAY_MUTEX_LOCKED
LLMemType m1(LLMemType::MTYPE_IO_BUFFER);
segment_iterator_t rv = mSegments.begin();
segment_iterator_t end = mSegments.end();
@@ -395,8 +460,10 @@ LLBufferArray::segment_iterator_t LLBufferArray::constructSegmentAfter(
return rv;
}
+//mMutexp should be locked before calling this.
LLBufferArray::segment_iterator_t LLBufferArray::getSegment(U8* address)
{
+ ASSERT_LLBUFFERARRAY_MUTEX_LOCKED
segment_iterator_t end = mSegments.end();
if(!address)
{
@@ -414,9 +481,11 @@ LLBufferArray::segment_iterator_t LLBufferArray::getSegment(U8* address)
return end;
}
+//mMutexp should be locked before calling this.
LLBufferArray::const_segment_iterator_t LLBufferArray::getSegment(
U8* address) const
{
+ ASSERT_LLBUFFERARRAY_MUTEX_LOCKED
const_segment_iterator_t end = mSegments.end();
if(!address)
{
@@ -466,6 +535,8 @@ S32 LLBufferArray::countAfter(S32 channel, U8* start) const
S32 count = 0;
S32 offset = 0;
const_segment_iterator_t it;
+
+ LLMutexLock lock(mMutexp) ;
const_segment_iterator_t end = mSegments.end();
if(start)
{
@@ -517,6 +588,8 @@ U8* LLBufferArray::readAfter(
len = 0;
S32 bytes_to_copy = 0;
const_segment_iterator_t it;
+
+ LLMutexLock lock(mMutexp) ;
const_segment_iterator_t end = mSegments.end();
if(start)
{
@@ -568,6 +641,7 @@ U8* LLBufferArray::seek(
U8* start,
S32 delta) const
{
+ ASSERT_LLBUFFERARRAY_MUTEX_LOCKED
LLMemType m1(LLMemType::MTYPE_IO_BUFFER);
const_segment_iterator_t it;
const_segment_iterator_t end = mSegments.end();
@@ -709,9 +783,14 @@ U8* LLBufferArray::seek(
return rv;
}
+//test use only
bool LLBufferArray::takeContents(LLBufferArray& source)
{
LLMemType m1(LLMemType::MTYPE_IO_BUFFER);
+
+ LLMutexLock lock(mMutexp);
+ source.lock();
+
std::copy(
source.mBuffers.begin(),
source.mBuffers.end(),
@@ -723,13 +802,17 @@ bool LLBufferArray::takeContents(LLBufferArray& source)
std::back_insert_iterator<segment_list_t>(mSegments));
source.mSegments.clear();
source.mNextBaseChannel = 0;
+ source.unlock();
+
return true;
}
+//mMutexp should be locked before calling this.
LLBufferArray::segment_iterator_t LLBufferArray::makeSegment(
S32 channel,
S32 len)
{
+ ASSERT_LLBUFFERARRAY_MUTEX_LOCKED
LLMemType m1(LLMemType::MTYPE_IO_BUFFER);
// start at the end of the buffers, because it is the most likely
// to have free space.
@@ -765,8 +848,10 @@ LLBufferArray::segment_iterator_t LLBufferArray::makeSegment(
return send;
}
+//mMutexp should be locked before calling this.
bool LLBufferArray::eraseSegment(const segment_iterator_t& erase_iter)
{
+ ASSERT_LLBUFFERARRAY_MUTEX_LOCKED
LLMemType m1(LLMemType::MTYPE_IO_BUFFER);
// Find out which buffer contains the segment, and if it is found,
@@ -792,13 +877,14 @@ bool LLBufferArray::eraseSegment(const segment_iterator_t& erase_iter)
return rv;
}
-
+//mMutexp should be locked before calling this.
bool LLBufferArray::copyIntoBuffers(
S32 channel,
const U8* src,
S32 len,
std::vector<LLSegment>& segments)
{
+ ASSERT_LLBUFFERARRAY_MUTEX_LOCKED
LLMemType m1(LLMemType::MTYPE_IO_BUFFER);
if(!src || !len) return false;
S32 copied = 0;
diff --git a/indra/llmessage/llbuffer.h b/indra/llmessage/llbuffer.h
index 1c42b6fbc6..ccdb9fa7ee 100644
--- a/indra/llmessage/llbuffer.h
+++ b/indra/llmessage/llbuffer.h
@@ -39,6 +39,7 @@
#include <list>
#include <vector>
+class LLMutex;
/**
* @class LLChannelDescriptors
* @brief A way simple interface to accesss channels inside a buffer
@@ -564,6 +565,29 @@ public:
* @return Returns true on success.
*/
bool eraseSegment(const segment_iterator_t& iter);
+
+ /**
+ * @brief Lock the mutex if it exists
+ * This method locks mMutexp to make accessing LLBufferArray thread-safe
+ */
+ void lock();
+
+ /**
+ * @brief Unlock the mutex if it exists
+ */
+ void unlock();
+
+ /**
+ * @brief Return mMutexp
+ */
+ LLMutex* getMutex();
+
+ /**
+ * @brief Set LLBufferArray to be shared across threads or not
+ * This method is to create mMutexp if is threaded.
+ * @param threaded Indicates this LLBufferArray instance is shared across threads if true.
+ */
+ void setThreaded(bool threaded);
//@}
protected:
@@ -595,6 +619,7 @@ protected:
S32 mNextBaseChannel;
buffer_list_t mBuffers;
segment_list_t mSegments;
+ LLMutex* mMutexp;
};
#endif // LL_LLBUFFER_H
diff --git a/indra/llmessage/llbufferstream.cpp b/indra/llmessage/llbufferstream.cpp
index 6257983c43..8d8ad05ad5 100644
--- a/indra/llmessage/llbufferstream.cpp
+++ b/indra/llmessage/llbufferstream.cpp
@@ -31,6 +31,7 @@
#include "llbuffer.h"
#include "llmemtype.h"
+#include "llthread.h"
static const S32 DEFAULT_OUTPUT_SEGMENT_SIZE = 1024 * 4;
@@ -62,6 +63,7 @@ int LLBufferStreamBuf::underflow()
return EOF;
}
+ LLMutexLock lock(mBuffer->getMutex());
LLBufferArray::segment_iterator_t iter;
LLBufferArray::segment_iterator_t end = mBuffer->endSegment();
U8* last_pos = (U8*)gptr();
@@ -149,6 +151,7 @@ int LLBufferStreamBuf::overflow(int c)
// since we got here, we have a buffer, and we have a character to
// put on it.
LLBufferArray::segment_iterator_t it;
+ LLMutexLock lock(mBuffer->getMutex());
it = mBuffer->makeSegment(mChannels.out(), DEFAULT_OUTPUT_SEGMENT_SIZE);
if(it != mBuffer->endSegment())
{
@@ -210,6 +213,7 @@ int LLBufferStreamBuf::sync()
// *NOTE: I bet we could just --address if address is not NULL.
// Need to think about that.
+ LLMutexLock lock(mBuffer->getMutex());
address = mBuffer->seek(mChannels.out(), address, -1);
if(address)
{
@@ -273,6 +277,8 @@ streampos LLBufferStreamBuf::seekoff(
// NULL is fine
break;
}
+
+ LLMutexLock lock(mBuffer->getMutex());
address = mBuffer->seek(mChannels.in(), base_addr, off);
if(address)
{
@@ -304,6 +310,8 @@ streampos LLBufferStreamBuf::seekoff(
// NULL is fine
break;
}
+
+ LLMutexLock lock(mBuffer->getMutex());
address = mBuffer->seek(mChannels.out(), base_addr, off);
if(address)
{
diff --git a/indra/llmessage/llcurl.cpp b/indra/llmessage/llcurl.cpp
index 7ca25d07fc..3bcaffc275 100644
--- a/indra/llmessage/llcurl.cpp
+++ b/indra/llmessage/llcurl.cpp
@@ -72,10 +72,9 @@
static const U32 EASY_HANDLE_POOL_SIZE = 5;
static const S32 MULTI_PERFORM_CALL_REPEAT = 5;
-static const S32 CURL_REQUEST_TIMEOUT = 30; // seconds
+static const S32 CURL_REQUEST_TIMEOUT = 30; // seconds per operation
static const S32 MAX_ACTIVE_REQUEST_COUNT = 100;
-static
// DEBUG //
S32 gCurlEasyCount = 0;
S32 gCurlMultiCount = 0;
@@ -87,6 +86,11 @@ std::vector<LLMutex*> LLCurl::sSSLMutex;
std::string LLCurl::sCAPath;
std::string LLCurl::sCAFile;
LLCurlThread* LLCurl::sCurlThread = NULL ;
+LLMutex* LLCurl::sHandleMutexp = NULL ;
+S32 LLCurl::sTotalHandles = 0 ;
+bool LLCurl::sNotQuitting = true;
+F32 LLCurl::sCurlRequestTimeOut = 120.f; //seonds
+S32 LLCurl::sMaxHandles = 256; //max number of handles, (multi handles and easy handles combined).
void check_curl_code(CURLcode code)
{
@@ -219,14 +223,20 @@ namespace boost
std::set<CURL*> LLCurl::Easy::sFreeHandles;
std::set<CURL*> LLCurl::Easy::sActiveHandles;
+LLMutex* LLCurl::Easy::sHandleMutexp = NULL ;
//static
CURL* LLCurl::Easy::allocEasyHandle()
{
+ llassert(LLCurl::getCurlThread()) ;
+
CURL* ret = NULL;
+
+ LLMutexLock lock(sHandleMutexp) ;
+
if (sFreeHandles.empty())
{
- ret = curl_easy_init();
+ ret = LLCurl::newEasyHandle();
}
else
{
@@ -246,15 +256,27 @@ CURL* LLCurl::Easy::allocEasyHandle()
//static
void LLCurl::Easy::releaseEasyHandle(CURL* handle)
{
+ static const S32 MAX_NUM_FREE_HANDLES = 32 ;
+
if (!handle)
{
- llerrs << "handle cannot be NULL!" << llendl;
+ return ; //handle allocation failed.
+ //llerrs << "handle cannot be NULL!" << llendl;
}
+ LLMutexLock lock(sHandleMutexp) ;
if (sActiveHandles.find(handle) != sActiveHandles.end())
{
sActiveHandles.erase(handle);
- sFreeHandles.insert(handle);
+
+ if(sFreeHandles.size() < MAX_NUM_FREE_HANDLES)
+ {
+ sFreeHandles.insert(handle);
+ }
+ else
+ {
+ LLCurl::deleteEasyHandle(handle) ;
+ }
}
else
{
@@ -297,6 +319,14 @@ LLCurl::Easy::~Easy()
--gCurlEasyCount;
curl_slist_free_all(mHeaders);
for_each(mStrings.begin(), mStrings.end(), DeletePointerArray());
+
+ if (mResponder && LLCurl::sNotQuitting) //aborted
+ {
+ std::string reason("Request timeout, aborted.") ;
+ mResponder->completedRaw(408, //HTTP_REQUEST_TIME_OUT, timeout, abort
+ reason, mChannels, mOutput);
+ }
+ mResponder = NULL;
}
void LLCurl::Easy::resetState()
@@ -469,6 +499,7 @@ void LLCurl::Easy::prepRequest(const std::string& url,
LLProxy::getInstance()->applyProxySettings(this);
mOutput.reset(new LLBufferArray);
+ mOutput->setThreaded(true);
setopt(CURLOPT_WRITEFUNCTION, (void*)&curlWriteCallback);
setopt(CURLOPT_WRITEDATA, (void*)this);
@@ -512,36 +543,56 @@ void LLCurl::Easy::prepRequest(const std::string& url,
}
////////////////////////////////////////////////////////////////////////////
-
-LLCurl::Multi::Multi()
+LLCurl::Multi::Multi(F32 idle_time_out)
: mQueued(0),
mErrorCount(0),
mState(STATE_READY),
mDead(FALSE),
mMutexp(NULL),
- mDeletionMutexp(NULL)
+ mDeletionMutexp(NULL),
+ mEasyMutexp(NULL)
{
- mCurlMultiHandle = curl_multi_init();
+ mCurlMultiHandle = LLCurl::newMultiHandle();
if (!mCurlMultiHandle)
{
llwarns << "curl_multi_init() returned NULL! Easy handles: " << gCurlEasyCount << " Multi handles: " << gCurlMultiCount << llendl;
- mCurlMultiHandle = curl_multi_init();
+ mCurlMultiHandle = LLCurl::newMultiHandle();
}
- llassert_always(mCurlMultiHandle);
-
- if(LLCurl::getCurlThread()->getThreaded())
+ //llassert_always(mCurlMultiHandle);
+
+ if(mCurlMultiHandle)
{
- mMutexp = new LLMutex(NULL) ;
- mDeletionMutexp = new LLMutex(NULL) ;
- }
- LLCurl::getCurlThread()->addMulti(this) ;
+ if(LLCurl::getCurlThread()->getThreaded())
+ {
+ mMutexp = new LLMutex(NULL) ;
+ mDeletionMutexp = new LLMutex(NULL) ;
+ mEasyMutexp = new LLMutex(NULL) ;
+ }
+ LLCurl::getCurlThread()->addMulti(this) ;
- ++gCurlMultiCount;
+ mIdleTimeOut = idle_time_out ;
+ if(mIdleTimeOut < LLCurl::sCurlRequestTimeOut)
+ {
+ mIdleTimeOut = LLCurl::sCurlRequestTimeOut ;
+ }
+
+ ++gCurlMultiCount;
+ }
}
LLCurl::Multi::~Multi()
{
+ cleanup() ;
+}
+
+void LLCurl::Multi::cleanup()
+{
+ if(!mCurlMultiHandle)
+ {
+ return ; //nothing to clean.
+ }
+
// Clean up active
for(easy_active_list_t::iterator iter = mEasyActiveList.begin();
iter != mEasyActiveList.end(); ++iter)
@@ -557,14 +608,22 @@ LLCurl::Multi::~Multi()
for_each(mEasyFreeList.begin(), mEasyFreeList.end(), DeletePointer());
mEasyFreeList.clear();
- check_curl_multi_code(curl_multi_cleanup(mCurlMultiHandle));
+ check_curl_multi_code(LLCurl::deleteMultiHandle(mCurlMultiHandle));
+ mCurlMultiHandle = NULL ;
delete mMutexp ;
mMutexp = NULL ;
delete mDeletionMutexp ;
mDeletionMutexp = NULL ;
-
+ delete mEasyMutexp ;
+ mEasyMutexp = NULL ;
+
+ mQueued = 0 ;
+ mState = STATE_COMPLETED;
+
--gCurlMultiCount;
+
+ return ;
}
void LLCurl::Multi::lock()
@@ -585,39 +644,27 @@ void LLCurl::Multi::unlock()
void LLCurl::Multi::markDead()
{
- if(mDeletionMutexp)
- {
- mDeletionMutexp->lock() ;
- }
-
+ LLMutexLock lock(mDeletionMutexp) ;
+
mDead = TRUE ;
-
- if(mDeletionMutexp)
- {
- mDeletionMutexp->unlock() ;
- }
+ LLCurl::getCurlThread()->setPriority(mHandle, LLQueuedThread::PRIORITY_URGENT) ;
}
void LLCurl::Multi::setState(LLCurl::Multi::ePerformState state)
{
lock() ;
mState = state ;
+ unlock() ;
+
if(mState == STATE_READY)
{
LLCurl::getCurlThread()->setPriority(mHandle, LLQueuedThread::PRIORITY_NORMAL) ;
- }
- unlock() ;
+ }
}
LLCurl::Multi::ePerformState LLCurl::Multi::getState()
{
- ePerformState state ;
-
- lock() ;
- state = mState ;
- unlock() ;
-
- return state ;
+ return mState;
}
bool LLCurl::Multi::isCompleted()
@@ -627,27 +674,30 @@ bool LLCurl::Multi::isCompleted()
bool LLCurl::Multi::waitToComplete()
{
+ if(!isValid())
+ {
+ return true ;
+ }
+
if(!mMutexp) //not threaded
{
doPerform() ;
return true ;
}
- bool completed ;
-
- lock() ;
- completed = (STATE_COMPLETED == mState) ;
+ bool completed = (STATE_COMPLETED == mState) ;
if(!completed)
{
- LLCurl::getCurlThread()->setPriority(mHandle, LLQueuedThread::PRIORITY_URGENT) ;
+ LLCurl::getCurlThread()->setPriority(mHandle, LLQueuedThread::PRIORITY_HIGH) ;
}
- unlock() ;
-
+
return completed;
}
CURLMsg* LLCurl::Multi::info_read(S32* msgs_in_queue)
{
+ LLMutexLock lock(mMutexp) ;
+
CURLMsg* curlmsg = curl_multi_info_read(mCurlMultiHandle, msgs_in_queue);
return curlmsg;
}
@@ -655,10 +705,8 @@ CURLMsg* LLCurl::Multi::info_read(S32* msgs_in_queue)
//return true if dead
bool LLCurl::Multi::doPerform()
{
- if(mDeletionMutexp)
- {
- mDeletionMutexp->lock() ;
- }
+ LLMutexLock lock(mDeletionMutexp) ;
+
bool dead = mDead ;
if(mDead)
@@ -675,6 +723,11 @@ bool LLCurl::Multi::doPerform()
call_count < MULTI_PERFORM_CALL_REPEAT;
call_count++)
{
+ LLMutexLock lock(mMutexp) ;
+
+ //WARNING: curl_multi_perform will block for many hundreds of milliseconds
+ // NEVER call this from the main thread, and NEVER allow the main thread to
+ // wait on a mutex held by this thread while curl_multi_perform is executing
CURLMcode code = curl_multi_perform(mCurlMultiHandle, &q);
if (CURLM_CALL_MULTI_PERFORM != code || q == 0)
{
@@ -685,12 +738,12 @@ bool LLCurl::Multi::doPerform()
}
mQueued = q;
- setState(STATE_COMPLETED) ;
+ setState(STATE_COMPLETED) ;
+ mIdleTimer.reset() ;
}
-
- if(mDeletionMutexp)
+ else if(mIdleTimer.getElapsedTimeF32() > mIdleTimeOut) //idle for too long, remove it.
{
- mDeletionMutexp->unlock() ;
+ dead = true ;
}
return dead ;
@@ -698,6 +751,11 @@ bool LLCurl::Multi::doPerform()
S32 LLCurl::Multi::process()
{
+ if(!isValid())
+ {
+ return 0 ;
+ }
+
waitToComplete() ;
if (getState() != STATE_COMPLETED)
@@ -715,10 +773,19 @@ S32 LLCurl::Multi::process()
if (msg->msg == CURLMSG_DONE)
{
U32 response = 0;
- easy_active_map_t::iterator iter = mEasyActiveMap.find(msg->easy_handle);
- if (iter != mEasyActiveMap.end())
+ Easy* easy = NULL ;
+
+ {
+ LLMutexLock lock(mEasyMutexp) ;
+ easy_active_map_t::iterator iter = mEasyActiveMap.find(msg->easy_handle);
+ if (iter != mEasyActiveMap.end())
+ {
+ easy = iter->second;
+ }
+ }
+
+ if(easy)
{
- Easy* easy = iter->second;
response = easy->report(msg->data.result);
removeEasy(easy);
}
@@ -743,19 +810,21 @@ S32 LLCurl::Multi::process()
LLCurl::Easy* LLCurl::Multi::allocEasy()
{
- Easy* easy = 0;
+ Easy* easy = 0;
if (mEasyFreeList.empty())
- {
+ {
easy = Easy::getEasy();
}
else
{
+ LLMutexLock lock(mEasyMutexp) ;
easy = *(mEasyFreeList.begin());
mEasyFreeList.erase(easy);
}
if (easy)
{
+ LLMutexLock lock(mEasyMutexp) ;
mEasyActiveList.insert(easy);
mEasyActiveMap[easy->getCurlHandle()] = easy;
}
@@ -764,6 +833,7 @@ LLCurl::Easy* LLCurl::Multi::allocEasy()
bool LLCurl::Multi::addEasy(Easy* easy)
{
+ LLMutexLock lock(mMutexp) ;
CURLMcode mcode = curl_multi_add_handle(mCurlMultiHandle, easy->getCurlHandle());
check_curl_multi_code(mcode);
//if (mcode != CURLM_OK)
@@ -776,22 +846,41 @@ bool LLCurl::Multi::addEasy(Easy* easy)
void LLCurl::Multi::easyFree(Easy* easy)
{
+ if(mEasyMutexp)
+ {
+ mEasyMutexp->lock() ;
+ }
+
mEasyActiveList.erase(easy);
mEasyActiveMap.erase(easy->getCurlHandle());
+
if (mEasyFreeList.size() < EASY_HANDLE_POOL_SIZE)
- {
- easy->resetState();
+ {
mEasyFreeList.insert(easy);
+
+ if(mEasyMutexp)
+ {
+ mEasyMutexp->unlock() ;
+ }
+
+ easy->resetState();
}
else
{
+ if(mEasyMutexp)
+ {
+ mEasyMutexp->unlock() ;
+ }
delete easy;
}
}
void LLCurl::Multi::removeEasy(Easy* easy)
{
- check_curl_multi_code(curl_multi_remove_handle(mCurlMultiHandle, easy->getCurlHandle()));
+ {
+ LLMutexLock lock(mMutexp) ;
+ check_curl_multi_code(curl_multi_remove_handle(mCurlMultiHandle, easy->getCurlHandle()));
+ }
easyFree(easy);
}
@@ -819,7 +908,11 @@ bool LLCurlThread::CurlRequest::processRequest()
if(mMulti)
{
completed = mCurlThread->doMultiPerform(mMulti) ;
- setPriority(LLQueuedThread::PRIORITY_LOW) ;
+
+ if(!completed)
+ {
+ setPriority(LLQueuedThread::PRIORITY_LOW) ;
+ }
}
return completed ;
@@ -827,7 +920,15 @@ bool LLCurlThread::CurlRequest::processRequest()
void LLCurlThread::CurlRequest::finishRequest(bool completed)
{
- mCurlThread->deleteMulti(mMulti) ;
+ if(mMulti->isDead())
+ {
+ mCurlThread->deleteMulti(mMulti) ;
+ }
+ else
+ {
+ mCurlThread->cleanupMulti(mMulti) ; //being idle too long, remove the request.
+ }
+
mMulti = NULL ;
}
@@ -841,7 +942,7 @@ LLCurlThread::~LLCurlThread()
{
}
-S32 LLCurlThread::update(U32 max_time_ms)
+S32 LLCurlThread::update(F32 max_time_ms)
{
return LLQueuedThread::update(max_time_ms);
}
@@ -860,7 +961,19 @@ void LLCurlThread::addMulti(LLCurl::Multi* multi)
void LLCurlThread::killMulti(LLCurl::Multi* multi)
{
- multi->markDead() ;
+ if(!multi)
+ {
+ return ;
+ }
+
+ if(multi->isValid())
+ {
+ multi->markDead() ;
+ }
+ else
+ {
+ deleteMulti(multi) ;
+ }
}
//private
@@ -874,6 +987,13 @@ void LLCurlThread::deleteMulti(LLCurl::Multi* multi)
{
delete multi ;
}
+
+//private
+void LLCurlThread::cleanupMulti(LLCurl::Multi* multi)
+{
+ multi->cleanup() ;
+}
+
//------------------------------------------------------------
//static
@@ -906,7 +1026,14 @@ LLCurlRequest::~LLCurlRequest()
void LLCurlRequest::addMulti()
{
LLCurl::Multi* multi = new LLCurl::Multi();
-
+ if(!multi->isValid())
+ {
+ LLCurl::getCurlThread()->killMulti(multi) ;
+ mActiveMulti = NULL ;
+ mActiveRequestCount = 0 ;
+ return;
+ }
+
mMultiSet.insert(multi);
mActiveMulti = multi;
mActiveRequestCount = 0;
@@ -920,7 +1047,12 @@ LLCurl::Easy* LLCurlRequest::allocEasy()
{
addMulti();
}
- llassert_always(mActiveMulti);
+ if(!mActiveMulti)
+ {
+ return NULL ;
+ }
+
+ //llassert_always(mActiveMulti);
++mActiveRequestCount;
LLCurl::Easy* easy = mActiveMulti->allocEasy();
return easy;
@@ -1030,6 +1162,19 @@ S32 LLCurlRequest::process()
{
curlmulti_set_t::iterator curiter = iter++;
LLCurl::Multi* multi = *curiter;
+
+ if(!multi->isValid())
+ {
+ if(multi == mActiveMulti)
+ {
+ mActiveMulti = NULL ;
+ mActiveRequestCount = 0 ;
+ }
+ mMultiSet.erase(curiter) ;
+ LLCurl::getCurlThread()->killMulti(multi) ;
+ continue ;
+ }
+
S32 tres = multi->process();
res += tres;
if (multi != mActiveMulti && tres == 0 && multi->mQueued == 0)
@@ -1050,6 +1195,19 @@ S32 LLCurlRequest::getQueued()
{
curlmulti_set_t::iterator curiter = iter++;
LLCurl::Multi* multi = *curiter;
+
+ if(!multi->isValid())
+ {
+ if(multi == mActiveMulti)
+ {
+ mActiveMulti = NULL ;
+ mActiveRequestCount = 0 ;
+ }
+ LLCurl::getCurlThread()->killMulti(multi);
+ mMultiSet.erase(curiter) ;
+ continue ;
+ }
+
queued += multi->mQueued;
if (multi->getState() != LLCurl::Multi::STATE_READY)
{
@@ -1069,13 +1227,22 @@ LLCurlEasyRequest::LLCurlEasyRequest()
{
mMulti = new LLCurl::Multi();
- mEasy = mMulti->allocEasy();
- if (mEasy)
+ if(mMulti->isValid())
{
- mEasy->setErrorBuffer();
- mEasy->setCA();
- // Set proxy settings if configured to do so.
- LLProxy::getInstance()->applyProxySettings(mEasy);
+ mEasy = mMulti->allocEasy();
+ if (mEasy)
+ {
+ mEasy->setErrorBuffer();
+ mEasy->setCA();
+ // Set proxy settings if configured to do so.
+ LLProxy::getInstance()->applyProxySettings(mEasy);
+ }
+ }
+ else
+ {
+ LLCurl::getCurlThread()->killMulti(mMulti) ;
+ mEasy = NULL ;
+ mMulti = NULL ;
}
}
@@ -1086,7 +1253,7 @@ LLCurlEasyRequest::~LLCurlEasyRequest()
void LLCurlEasyRequest::setopt(CURLoption option, S32 value)
{
- if (mEasy)
+ if (isValid() && mEasy)
{
mEasy->setopt(option, value);
}
@@ -1094,7 +1261,7 @@ void LLCurlEasyRequest::setopt(CURLoption option, S32 value)
void LLCurlEasyRequest::setoptString(CURLoption option, const std::string& value)
{
- if (mEasy)
+ if (isValid() && mEasy)
{
mEasy->setoptString(option, value);
}
@@ -1102,7 +1269,7 @@ void LLCurlEasyRequest::setoptString(CURLoption option, const std::string& value
void LLCurlEasyRequest::setPost(char* postdata, S32 size)
{
- if (mEasy)
+ if (isValid() && mEasy)
{
mEasy->setopt(CURLOPT_POST, 1);
mEasy->setopt(CURLOPT_POSTFIELDS, postdata);
@@ -1112,7 +1279,7 @@ void LLCurlEasyRequest::setPost(char* postdata, S32 size)
void LLCurlEasyRequest::setHeaderCallback(curl_header_callback callback, void* userdata)
{
- if (mEasy)
+ if (isValid() && mEasy)
{
mEasy->setopt(CURLOPT_HEADERFUNCTION, (void*)callback);
mEasy->setopt(CURLOPT_HEADERDATA, userdata); // aka CURLOPT_WRITEHEADER
@@ -1121,7 +1288,7 @@ void LLCurlEasyRequest::setHeaderCallback(curl_header_callback callback, void* u
void LLCurlEasyRequest::setWriteCallback(curl_write_callback callback, void* userdata)
{
- if (mEasy)
+ if (isValid() && mEasy)
{
mEasy->setopt(CURLOPT_WRITEFUNCTION, (void*)callback);
mEasy->setopt(CURLOPT_WRITEDATA, userdata);
@@ -1130,7 +1297,7 @@ void LLCurlEasyRequest::setWriteCallback(curl_write_callback callback, void* use
void LLCurlEasyRequest::setReadCallback(curl_read_callback callback, void* userdata)
{
- if (mEasy)
+ if (isValid() && mEasy)
{
mEasy->setopt(CURLOPT_READFUNCTION, (void*)callback);
mEasy->setopt(CURLOPT_READDATA, userdata);
@@ -1139,7 +1306,7 @@ void LLCurlEasyRequest::setReadCallback(curl_read_callback callback, void* userd
void LLCurlEasyRequest::setSSLCtxCallback(curl_ssl_ctx_callback callback, void* userdata)
{
- if (mEasy)
+ if (isValid() && mEasy)
{
mEasy->setopt(CURLOPT_SSL_CTX_FUNCTION, (void*)callback);
mEasy->setopt(CURLOPT_SSL_CTX_DATA, userdata);
@@ -1148,7 +1315,7 @@ void LLCurlEasyRequest::setSSLCtxCallback(curl_ssl_ctx_callback callback, void*
void LLCurlEasyRequest::slist_append(const char* str)
{
- if (mEasy)
+ if (isValid() && mEasy)
{
mEasy->slist_append(str);
}
@@ -1159,7 +1326,7 @@ void LLCurlEasyRequest::sendRequest(const std::string& url)
llassert_always(!mRequestSent);
mRequestSent = true;
lldebugs << url << llendl;
- if (mEasy)
+ if (isValid() && mEasy)
{
mEasy->setHeaders();
mEasy->setoptString(CURLOPT_URL, url);
@@ -1171,7 +1338,7 @@ void LLCurlEasyRequest::requestComplete()
{
llassert_always(mRequestSent);
mRequestSent = false;
- if (mEasy)
+ if (isValid() && mEasy)
{
mMulti->removeEasy(mEasy);
}
@@ -1180,6 +1347,10 @@ void LLCurlEasyRequest::requestComplete()
// Usage: Call getRestult until it returns false (no more messages)
bool LLCurlEasyRequest::getResult(CURLcode* result, LLCurl::TransferInfo* info)
{
+ if(!isValid())
+ {
+ return false ;
+ }
if (!mMulti->isCompleted())
{ //we're busy, try again later
return false;
@@ -1244,7 +1415,7 @@ CURLMsg* LLCurlEasyRequest::info_read(S32* q, LLCurl::TransferInfo* info)
std::string LLCurlEasyRequest::getErrorString()
{
- return mEasy ? std::string(mEasy->getErrorBuffer()) : std::string();
+ return isValid() && mEasy ? std::string(mEasy->getErrorBuffer()) : std::string();
}
////////////////////////////////////////////////////////////////////////////
@@ -1270,8 +1441,11 @@ unsigned long LLCurl::ssl_thread_id(void)
}
#endif
-void LLCurl::initClass(bool multi_threaded)
+void LLCurl::initClass(F32 curl_reuest_timeout, S32 max_number_handles, bool multi_threaded)
{
+ sCurlRequestTimeOut = curl_reuest_timeout ; //seconds
+ sMaxHandles = max_number_handles ; //max number of handles, (multi handles and easy handles combined).
+
// Do not change this "unless you are familiar with and mean to control
// internal operations of libcurl"
// - http://curl.haxx.se/libcurl/c/curl_global_init.html
@@ -1290,10 +1464,17 @@ void LLCurl::initClass(bool multi_threaded)
#endif
sCurlThread = new LLCurlThread(multi_threaded) ;
+ if(multi_threaded)
+ {
+ sHandleMutexp = new LLMutex(NULL) ;
+ Easy::sHandleMutexp = new LLMutex(NULL) ;
+ }
}
void LLCurl::cleanupClass()
{
+ sNotQuitting = false; //set quitting
+
//shut down curl thread
while(1)
{
@@ -1314,14 +1495,85 @@ void LLCurl::cleanupClass()
for (std::set<CURL*>::iterator iter = Easy::sFreeHandles.begin(); iter != Easy::sFreeHandles.end(); ++iter)
{
CURL* curl = *iter;
- curl_easy_cleanup(curl);
+ LLCurl::deleteEasyHandle(curl);
}
Easy::sFreeHandles.clear();
+ delete Easy::sHandleMutexp ;
+ Easy::sHandleMutexp = NULL ;
+
+ delete sHandleMutexp ;
+ sHandleMutexp = NULL ;
+
llassert(Easy::sActiveHandles.empty());
}
+//static
+CURLM* LLCurl::newMultiHandle()
+{
+ LLMutexLock lock(sHandleMutexp) ;
+
+ if(sTotalHandles + 1 > sMaxHandles)
+ {
+ llwarns << "no more handles available." << llendl ;
+ return NULL ; //failed
+ }
+ sTotalHandles++;
+
+ CURLM* ret = curl_multi_init() ;
+ if(!ret)
+ {
+ llwarns << "curl_multi_init failed." << llendl ;
+ }
+
+ return ret ;
+}
+
+//static
+CURLMcode LLCurl::deleteMultiHandle(CURLM* handle)
+{
+ if(handle)
+ {
+ LLMutexLock lock(sHandleMutexp) ;
+ sTotalHandles-- ;
+ return curl_multi_cleanup(handle) ;
+ }
+ return CURLM_OK ;
+}
+
+//static
+CURL* LLCurl::newEasyHandle()
+{
+ LLMutexLock lock(sHandleMutexp) ;
+
+ if(sTotalHandles + 1 > sMaxHandles)
+ {
+ llwarns << "no more handles available." << llendl ;
+ return NULL ; //failed
+ }
+ sTotalHandles++;
+
+ CURL* ret = curl_easy_init() ;
+ if(!ret)
+ {
+ llwarns << "curl_easy_init failed." << llendl ;
+ }
+
+ return ret ;
+}
+
+//static
+void LLCurl::deleteEasyHandle(CURL* handle)
+{
+ if(handle)
+ {
+ LLMutexLock lock(sHandleMutexp) ;
+ curl_easy_cleanup(handle) ;
+ sTotalHandles-- ;
+ }
+}
+
const unsigned int LLCurl::MAX_REDIRECTS = 5;
// Provide access to LLCurl free functions outside of llcurl.cpp without polluting the global namespace.
diff --git a/indra/llmessage/llcurl.h b/indra/llmessage/llcurl.h
index a275db3e53..fd664c0fa1 100644
--- a/indra/llmessage/llcurl.h
+++ b/indra/llmessage/llcurl.h
@@ -43,6 +43,7 @@
#include "llsd.h"
#include "llthread.h"
#include "llqueuedthread.h"
+#include "llframetimer.h"
class LLMutex;
class LLCurlThread;
@@ -162,7 +163,7 @@ public:
/**
* @ brief Initialize LLCurl class
*/
- static void initClass(bool multi_threaded = false);
+ static void initClass(F32 curl_reuest_timeout = 120.f, S32 max_number_handles = 256, bool multi_threaded = false);
/**
* @ brief Cleanup LLCurl class
@@ -182,11 +183,24 @@ public:
static unsigned long ssl_thread_id(void);
static LLCurlThread* getCurlThread() { return sCurlThread ;}
+
+ static CURLM* newMultiHandle() ;
+ static CURLMcode deleteMultiHandle(CURLM* handle) ;
+ static CURL* newEasyHandle() ;
+ static void deleteEasyHandle(CURL* handle) ;
+
private:
static std::string sCAPath;
static std::string sCAFile;
static const unsigned int MAX_REDIRECTS;
static LLCurlThread* sCurlThread;
+
+ static LLMutex* sHandleMutexp ;
+ static S32 sTotalHandles ;
+ static S32 sMaxHandles;
+public:
+ static bool sNotQuitting;
+ static F32 sCurlRequestTimeOut;
};
class LLCurl::Easy
@@ -253,6 +267,7 @@ private:
static std::set<CURL*> sFreeHandles;
static std::set<CURL*> sActiveHandles;
+ static LLMutex* sHandleMutexp ;
};
class LLCurl::Multi
@@ -276,7 +291,7 @@ public:
STATE_COMPLETED=2
} ePerformState;
- Multi();
+ Multi(F32 idle_time_out = 0.f);
LLCurl::Easy* allocEasy();
bool addEasy(LLCurl::Easy* easy);
@@ -287,7 +302,10 @@ public:
void setState(ePerformState state) ;
ePerformState getState() ;
+
bool isCompleted() ;
+ bool isValid() {return mCurlMultiHandle != NULL ;}
+ bool isDead() {return mDead;}
bool waitToComplete() ;
@@ -300,6 +318,7 @@ public:
private:
void easyFree(LLCurl::Easy*);
+ void cleanup() ;
CURLM* mCurlMultiHandle;
@@ -316,6 +335,9 @@ private:
BOOL mDead ;
LLMutex* mMutexp ;
LLMutex* mDeletionMutexp ;
+ LLMutex* mEasyMutexp ;
+ LLFrameTimer mIdleTimer ;
+ F32 mIdleTimeOut;
};
class LLCurlThread : public LLQueuedThread
@@ -344,7 +366,7 @@ public:
LLCurlThread(bool threaded = true) ;
virtual ~LLCurlThread() ;
- S32 update(U32 max_time_ms);
+ S32 update(F32 max_time_ms);
void addMulti(LLCurl::Multi* multi) ;
void killMulti(LLCurl::Multi* multi) ;
@@ -352,6 +374,7 @@ public:
private:
bool doMultiPerform(LLCurl::Multi* multi) ;
void deleteMulti(LLCurl::Multi* multi) ;
+ void cleanupMulti(LLCurl::Multi* multi) ;
} ;
namespace boost
@@ -409,6 +432,7 @@ public:
std::string getErrorString();
bool isCompleted() {return mMulti->isCompleted() ;}
bool wait() { return mMulti->waitToComplete(); }
+ bool isValid() {return mMulti && mMulti->isValid(); }
LLCurl::Easy* getEasy() const { return mEasy; }
diff --git a/indra/llmessage/llhttpassetstorage.cpp b/indra/llmessage/llhttpassetstorage.cpp
index 2bca517e97..612d765969 100644
--- a/indra/llmessage/llhttpassetstorage.cpp
+++ b/indra/llmessage/llhttpassetstorage.cpp
@@ -232,7 +232,8 @@ LLSD LLHTTPAssetRequest::getFullDetails() const
void LLHTTPAssetRequest::setupCurlHandle()
{
// *NOTE: Similar code exists in mapserver/llcurlutil.cpp JC
- mCurlHandle = curl_easy_init();
+ mCurlHandle = LLCurl::newEasyHandle();
+ llassert_always(mCurlHandle != NULL) ;
// Apply proxy settings if configured to do so
LLProxy::getInstance()->applyProxySettings(mCurlHandle);
@@ -278,7 +279,7 @@ void LLHTTPAssetRequest::setupCurlHandle()
void LLHTTPAssetRequest::cleanupCurlHandle()
{
- curl_easy_cleanup(mCurlHandle);
+ LLCurl::deleteEasyHandle(mCurlHandle);
if (mAssetStoragep)
{
// Terminating a request. Thus upload or download is no longer pending.
@@ -429,12 +430,13 @@ void LLHTTPAssetStorage::_init(const std::string& web_host, const std::string& l
// curl_global_init moved to LLCurl::initClass()
- mCurlMultiHandle = curl_multi_init();
+ mCurlMultiHandle = LLCurl::newMultiHandle() ;
+ llassert_always(mCurlMultiHandle != NULL) ;
}
LLHTTPAssetStorage::~LLHTTPAssetStorage()
{
- curl_multi_cleanup(mCurlMultiHandle);
+ LLCurl::deleteMultiHandle(mCurlMultiHandle);
mCurlMultiHandle = NULL;
// curl_global_cleanup moved to LLCurl::initClass()
diff --git a/indra/llmessage/llhttpclient.cpp b/indra/llmessage/llhttpclient.cpp
index dd4e3a6300..231cb7ca8f 100644
--- a/indra/llmessage/llhttpclient.cpp
+++ b/indra/llmessage/llhttpclient.cpp
@@ -228,6 +228,12 @@ static void request(
LLPumpIO::chain_t chain;
LLURLRequest* req = new LLURLRequest(method, url);
+ if(!req->isValid())//failed
+ {
+ delete req ;
+ return ;
+ }
+
req->setSSLVerifyCallback(LLHTTPClient::getCertVerifyCallback(), (void *)req);
@@ -423,7 +429,9 @@ static LLSD blocking_request(
{
lldebugs << "blockingRequest of " << url << llendl;
char curl_error_buffer[CURL_ERROR_SIZE] = "\0";
- CURL* curlp = curl_easy_init();
+ CURL* curlp = LLCurl::newEasyHandle();
+ llassert_always(curlp != NULL) ;
+
LLHTTPBuffer http_buffer;
std::string body_str;
@@ -517,7 +525,7 @@ static LLSD blocking_request(
}
// * Cleanup
- curl_easy_cleanup(curlp);
+ LLCurl::deleteEasyHandle(curlp);
return response;
}
diff --git a/indra/llmessage/lliohttpserver.cpp b/indra/llmessage/lliohttpserver.cpp
index 73e8a69085..987f386aa3 100644
--- a/indra/llmessage/lliohttpserver.cpp
+++ b/indra/llmessage/lliohttpserver.cpp
@@ -818,6 +818,8 @@ LLIOPipe::EStatus LLHTTPResponder::process_impl(
// Copy everything after mLast read to the out.
LLBufferArray::segment_iterator_t seg_iter;
+
+ buffer->lock();
seg_iter = buffer->splitAfter(mLastRead);
if(seg_iter != buffer->endSegment())
{
@@ -838,7 +840,7 @@ LLIOPipe::EStatus LLHTTPResponder::process_impl(
}
#endif
}
-
+ buffer->unlock();
//
// *FIX: get rid of extra bytes off the end
//
diff --git a/indra/llmessage/lliopipe.cpp b/indra/llmessage/lliopipe.cpp
index 6e4eec74a6..8f827f7a30 100644
--- a/indra/llmessage/lliopipe.cpp
+++ b/indra/llmessage/lliopipe.cpp
@@ -75,6 +75,12 @@ LLIOPipe::~LLIOPipe()
//lldebugs << "destroying LLIOPipe" << llendl;
}
+//virtual
+bool LLIOPipe::isValid()
+{
+ return true ;
+}
+
// static
std::string LLIOPipe::lookupStatusString(EStatus status)
{
diff --git a/indra/llmessage/lliopipe.h b/indra/llmessage/lliopipe.h
index 8e656b6da1..cbd17b5a3d 100644
--- a/indra/llmessage/lliopipe.h
+++ b/indra/llmessage/lliopipe.h
@@ -231,6 +231,8 @@ public:
*/
virtual ~LLIOPipe();
+ virtual bool isValid() ;
+
protected:
/**
* @brief Base Constructor.
diff --git a/indra/llmessage/lliosocket.cpp b/indra/llmessage/lliosocket.cpp
index 54ceab3422..d5b4d45821 100644
--- a/indra/llmessage/lliosocket.cpp
+++ b/indra/llmessage/lliosocket.cpp
@@ -445,6 +445,7 @@ LLIOPipe::EStatus LLIOSocketWriter::process_impl(
// efficient - not only because writev() is better, but also
// because we won't have to do as much work to find the start
// address.
+ buffer->lock();
LLBufferArray::segment_iterator_t it;
LLBufferArray::segment_iterator_t end = buffer->endSegment();
LLSegment segment;
@@ -524,6 +525,8 @@ LLIOPipe::EStatus LLIOSocketWriter::process_impl(
}
}
+ buffer->unlock();
+
PUMP_DEBUG;
if(done && eos)
{
diff --git a/indra/llmessage/llpumpio.cpp b/indra/llmessage/llpumpio.cpp
index a8d2a0a224..f3ef4f2684 100644
--- a/indra/llmessage/llpumpio.cpp
+++ b/indra/llmessage/llpumpio.cpp
@@ -195,7 +195,7 @@ bool LLPumpIO::prime(apr_pool_t* pool)
return ((pool == NULL) ? false : true);
}
-bool LLPumpIO::addChain(const chain_t& chain, F32 timeout)
+bool LLPumpIO::addChain(const chain_t& chain, F32 timeout, bool has_curl_request)
{
LLMemType m1(LLMemType::MTYPE_IO_PUMP);
if(chain.empty()) return false;
@@ -204,8 +204,10 @@ bool LLPumpIO::addChain(const chain_t& chain, F32 timeout)
LLScopedLock lock(mChainsMutex);
#endif
LLChainInfo info;
+ info.mHasCurlRequest = has_curl_request;
info.setTimeoutSeconds(timeout);
info.mData = LLIOPipe::buffer_ptr_t(new LLBufferArray);
+ info.mData->setThreaded(has_curl_request);
LLLinkInfo link;
#if LL_DEBUG_PIPE_TYPE_IN_PUMP
lldebugs << "LLPumpIO::addChain() " << chain[0] << " '"
@@ -440,6 +442,15 @@ void LLPumpIO::pump()
static LLFastTimer::DeclareTimer FTM_PUMP_IO("Pump IO");
+LLPumpIO::current_chain_t LLPumpIO::removeRunningChain(LLPumpIO::current_chain_t& run_chain)
+{
+ std::for_each(
+ (*run_chain).mDescriptors.begin(),
+ (*run_chain).mDescriptors.end(),
+ ll_delete_apr_pollset_fd_client_data());
+ return mRunningChains.erase(run_chain);
+}
+
//timeout is in microseconds
void LLPumpIO::pump(const S32& poll_timeout)
{
@@ -585,10 +596,16 @@ void LLPumpIO::pump(const S32& poll_timeout)
// << (*run_chain).mChainLinks[0].mPipe
// << " because we reached the end." << llendl;
#endif
- run_chain = mRunningChains.erase(run_chain);
+ run_chain = removeRunningChain(run_chain);
continue;
}
}
+ else if(isChainExpired(*run_chain))
+ {
+ run_chain = removeRunningChain(run_chain);
+ continue;
+ }
+
PUMP_DEBUG;
if((*run_chain).mLock)
{
@@ -696,11 +713,7 @@ void LLPumpIO::pump(const S32& poll_timeout)
PUMP_DEBUG;
// This chain is done. Clean up any allocated memory and
// erase the chain info.
- std::for_each(
- (*run_chain).mDescriptors.begin(),
- (*run_chain).mDescriptors.end(),
- ll_delete_apr_pollset_fd_client_data());
- run_chain = mRunningChains.erase(run_chain);
+ run_chain = removeRunningChain(run_chain);
// *NOTE: may not always need to rebuild the pollset.
mRebuildPollset = true;
@@ -1095,6 +1108,24 @@ void LLPumpIO::processChain(LLChainInfo& chain)
PUMP_DEBUG;
}
+bool LLPumpIO::isChainExpired(LLChainInfo& chain)
+{
+ if(!chain.mHasCurlRequest)
+ {
+ return false ;
+ }
+
+ for(links_t::iterator iter = chain.mChainLinks.begin(); iter != chain.mChainLinks.end(); ++iter)
+ {
+ if(!(*iter).mPipe->isValid())
+ {
+ return true ;
+ }
+ }
+
+ return false ;
+}
+
bool LLPumpIO::handleChainError(
LLChainInfo& chain,
LLIOPipe::EStatus error)
@@ -1136,6 +1167,9 @@ bool LLPumpIO::handleChainError(
#endif
keep_going = false;
break;
+ case LLIOPipe::STATUS_EXPIRED:
+ keep_going = false;
+ break ;
default:
if(LLIOPipe::isSuccess(error))
{
@@ -1157,7 +1191,8 @@ bool LLPumpIO::handleChainError(
LLPumpIO::LLChainInfo::LLChainInfo() :
mInit(false),
mLock(0),
- mEOS(false)
+ mEOS(false),
+ mHasCurlRequest(false)
{
LLMemType m1(LLMemType::MTYPE_IO_PUMP);
mTimer.setTimerExpirySec(DEFAULT_CHAIN_EXPIRY_SECS);
diff --git a/indra/llmessage/llpumpio.h b/indra/llmessage/llpumpio.h
index 9303c9d7fc..d2c5d37571 100644
--- a/indra/llmessage/llpumpio.h
+++ b/indra/llmessage/llpumpio.h
@@ -111,9 +111,10 @@ public:
* @param chain The pipes for the chain
* @param timeout The number of seconds in the future to
* expire. Pass in 0.0f to never expire.
+ * @param has_curl_request The chain contains LLURLRequest if true.
* @return Returns true if anything was added to the pump.
*/
- bool addChain(const chain_t& chain, F32 timeout);
+ bool addChain(const chain_t& chain, F32 timeout, bool has_curl_request = false);
/**
* @brief Struct to associate a pipe with it's buffer io indexes.
@@ -356,12 +357,13 @@ protected:
// basic member data
bool mInit;
+ bool mEOS;
+ bool mHasCurlRequest;
S32 mLock;
LLFrameTimer mTimer;
links_t::iterator mHead;
links_t mChainLinks;
- LLIOPipe::buffer_ptr_t mData;
- bool mEOS;
+ LLIOPipe::buffer_ptr_t mData;
LLSD mContext;
// tracking inside the pump
@@ -402,7 +404,7 @@ protected:
protected:
void initialize(apr_pool_t* pool);
void cleanup();
-
+ current_chain_t removeRunningChain(current_chain_t& chain) ;
/**
* @brief Given the internal state of the chains, rebuild the pollset
* @see setConditional()
@@ -429,6 +431,9 @@ protected:
*/
bool handleChainError(LLChainInfo& chain, LLIOPipe::EStatus error);
+ //if the chain is expired, remove it
+ bool isChainExpired(LLChainInfo& chain) ;
+
public:
/**
* @brief Return number of running chains.
diff --git a/indra/llmessage/llsdrpcclient.h b/indra/llmessage/llsdrpcclient.h
index 9fb49a5c33..0cecf4f688 100644
--- a/indra/llmessage/llsdrpcclient.h
+++ b/indra/llmessage/llsdrpcclient.h
@@ -240,9 +240,16 @@ public:
virtual bool build(LLPumpIO::chain_t& chain, LLSD context) const
{
lldebugs << "LLSDRPCClientFactory::build" << llendl;
- LLIOPipe::ptr_t service(new Client);
- chain.push_back(service);
LLURLRequest* http(new LLURLRequest(LLURLRequest::HTTP_POST));
+ if(!http->isValid())
+ {
+ llwarns << "Creating LLURLRequest failed." << llendl ;
+ delete http;
+ return false;
+ }
+
+ LLIOPipe::ptr_t service(new Client);
+ chain.push_back(service);
LLIOPipe::ptr_t http_pipe(http);
http->addHeader("Content-Type: text/llsd");
if(mURL.empty())
@@ -283,9 +290,16 @@ public:
virtual bool build(LLPumpIO::chain_t& chain, LLSD context) const
{
lldebugs << "LLXMLSDRPCClientFactory::build" << llendl;
- LLIOPipe::ptr_t service(new Client);
- chain.push_back(service);
+
LLURLRequest* http(new LLURLRequest(LLURLRequest::HTTP_POST));
+ if(!http->isValid())
+ {
+ llwarns << "Creating LLURLRequest failed." << llendl ;
+ delete http;
+ return false ;
+ }
+ LLIOPipe::ptr_t service(new Client);
+ chain.push_back(service);
LLIOPipe::ptr_t http_pipe(http);
http->addHeader("Content-Type: text/xml");
if(mURL.empty())
diff --git a/indra/llmessage/llurlrequest.cpp b/indra/llmessage/llurlrequest.cpp
index a3a2b2b1b8..a16f5c7bf0 100644
--- a/indra/llmessage/llurlrequest.cpp
+++ b/indra/llmessage/llurlrequest.cpp
@@ -64,7 +64,7 @@ public:
~LLURLRequestDetail();
std::string mURL;
LLCurlEasyRequest* mCurlRequest;
- LLBufferArray* mResponseBuffer;
+ LLIOPipe::buffer_ptr_t mResponseBuffer;
LLChannelDescriptors mChannels;
U8* mLastRead;
U32 mBodyLimit;
@@ -75,7 +75,6 @@ public:
LLURLRequestDetail::LLURLRequestDetail() :
mCurlRequest(NULL),
- mResponseBuffer(NULL),
mLastRead(NULL),
mBodyLimit(0),
mByteAccumulator(0),
@@ -84,13 +83,18 @@ LLURLRequestDetail::LLURLRequestDetail() :
{
LLMemType m1(LLMemType::MTYPE_IO_URL_REQUEST);
mCurlRequest = new LLCurlEasyRequest();
+
+ if(!mCurlRequest->isValid()) //failed.
+ {
+ delete mCurlRequest ;
+ mCurlRequest = NULL ;
+ }
}
LLURLRequestDetail::~LLURLRequestDetail()
{
LLMemType m1(LLMemType::MTYPE_IO_URL_REQUEST);
delete mCurlRequest;
- mResponseBuffer = NULL;
mLastRead = NULL;
}
@@ -252,12 +256,24 @@ void LLURLRequest::allowCookies()
mDetail->mCurlRequest->setoptString(CURLOPT_COOKIEFILE, "");
}
+//virtual
+bool LLURLRequest::isValid()
+{
+ return mDetail->mCurlRequest && mDetail->mCurlRequest->isValid();
+}
+
// virtual
LLIOPipe::EStatus LLURLRequest::handleError(
LLIOPipe::EStatus status,
LLPumpIO* pump)
{
LLMemType m1(LLMemType::MTYPE_IO_URL_REQUEST);
+
+ if(!isValid())
+ {
+ return STATUS_EXPIRED ;
+ }
+
if(mCompletionCallback && pump)
{
LLURLRequestComplete* complete = NULL;
@@ -326,7 +342,7 @@ LLIOPipe::EStatus LLURLRequest::process_impl(
// *FIX: bit of a hack, but it should work. The configure and
// callback method expect this information to be ready.
- mDetail->mResponseBuffer = buffer.get();
+ mDetail->mResponseBuffer = buffer;
mDetail->mChannels = channels;
if(!configure())
{
@@ -443,6 +459,12 @@ void LLURLRequest::initialize()
LLMemType m1(LLMemType::MTYPE_IO_URL_REQUEST);
mState = STATE_INITIALIZED;
mDetail = new LLURLRequestDetail;
+
+ if(!isValid())
+ {
+ return ;
+ }
+
mDetail->mCurlRequest->setopt(CURLOPT_NOSIGNAL, 1);
mDetail->mCurlRequest->setWriteCallback(&downCallback, (void*)this);
mDetail->mCurlRequest->setReadCallback(&upCallback, (void*)this);
diff --git a/indra/llmessage/llurlrequest.h b/indra/llmessage/llurlrequest.h
index ec5c2c1941..44d358d906 100644
--- a/indra/llmessage/llurlrequest.h
+++ b/indra/llmessage/llurlrequest.h
@@ -188,6 +188,8 @@ public:
*/
void allowCookies();
+ /*virtual*/ bool isValid() ;
+
public:
/**
* @brief Give this pipe a chance to handle a generated error
diff --git a/indra/llmessage/tests/llsdmessage_test.cpp b/indra/llmessage/tests/llsdmessage_test.cpp
index 0f2c069303..6871ac0d52 100644
--- a/indra/llmessage/tests/llsdmessage_test.cpp
+++ b/indra/llmessage/tests/llsdmessage_test.cpp
@@ -42,6 +42,7 @@
// external library headers
// other Linden headers
#include "../test/lltut.h"
+#include "../test/catch_and_store_what_in.h"
#include "llsdserialize.h"
#include "llevents.h"
#include "stringize.h"
@@ -72,43 +73,14 @@ namespace tut
template<> template<>
void llsdmessage_object::test<1>()
{
- bool threw = false;
+ std::string threw;
// This should fail...
try
{
LLSDMessage localListener;
}
- catch (const LLEventPump::DupPumpName&)
- {
- threw = true;
- }
- catch (const std::runtime_error& ex)
- {
- // This clause is because on Linux, on the viewer side, for this
- // one test program (though not others!), the
- // LLEventPump::DupPumpName exception isn't caught by the clause
- // above. Warn the user...
- std::cerr << "Failed to catch " << typeid(ex).name() << std::endl;
- // But if the expected exception was thrown, allow the test to
- // succeed anyway. Not sure how else to handle this odd case.
- if (std::string(typeid(ex).name()) == typeid(LLEventPump::DupPumpName).name())
- {
- threw = true;
- }
- else
- {
- // We don't even recognize this exception. Let it propagate
- // out to TUT to fail the test.
- throw;
- }
- }
- catch (...)
- {
- std::cerr << "Utterly failed to catch expected exception!" << std::endl;
- // This case is full of fail. We HAVE to address it.
- throw;
- }
- ensure("second LLSDMessage should throw", threw);
+ CATCH_AND_STORE_WHAT_IN(threw, LLEventPump::DupPumpName)
+ ensure("second LLSDMessage should throw", ! threw.empty());
}
template<> template<>