summaryrefslogtreecommitdiff
path: root/indra/llmessage/llcurl.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'indra/llmessage/llcurl.cpp')
-rw-r--r--indra/llmessage/llcurl.cpp356
1 files changed, 220 insertions, 136 deletions
diff --git a/indra/llmessage/llcurl.cpp b/indra/llmessage/llcurl.cpp
index 330028c926..7f61e1ac04 100644
--- a/indra/llmessage/llcurl.cpp
+++ b/indra/llmessage/llcurl.cpp
@@ -86,9 +86,7 @@ S32 gCurlMultiCount = 0;
std::vector<LLMutex*> LLCurl::sSSLMutex;
std::string LLCurl::sCAPath;
std::string LLCurl::sCAFile;
-
-bool LLCurl::sMultiThreaded = false;
-static U32 sMainThreadID = 0;
+LLCurlThread* LLCurl::sCurlThread = NULL ;
void check_curl_code(CURLcode code)
{
@@ -221,14 +219,11 @@ namespace boost
std::set<CURL*> LLCurl::Easy::sFreeHandles;
std::set<CURL*> LLCurl::Easy::sActiveHandles;
-LLMutex* LLCurl::Easy::sHandleMutex = NULL;
-LLMutex* LLCurl::Easy::sMultiMutex = NULL;
//static
CURL* LLCurl::Easy::allocEasyHandle()
{
CURL* ret = NULL;
- LLMutexLock lock(sHandleMutex);
if (sFreeHandles.empty())
{
ret = curl_easy_init();
@@ -256,8 +251,6 @@ void LLCurl::Easy::releaseEasyHandle(CURL* handle)
llerrs << "handle cannot be NULL!" << llendl;
}
- LLMutexLock lock(sHandleMutex);
-
if (sActiveHandles.find(handle) != sActiveHandles.end())
{
sActiveHandles.erase(handle);
@@ -521,23 +514,13 @@ void LLCurl::Easy::prepRequest(const std::string& url,
////////////////////////////////////////////////////////////////////////////
LLCurl::Multi::Multi()
- : LLThread("Curl Multi"),
- mQueued(0),
+ : mQueued(0),
mErrorCount(0),
- mPerformState(PERFORM_STATE_READY)
+ mState(STATE_READY),
+ mDead(FALSE),
+ mMutexp(NULL),
+ mDeletionMutexp(NULL)
{
- mQuitting = false;
-
- mThreaded = LLCurl::sMultiThreaded && LLThread::currentID() == sMainThreadID;
- if (mThreaded)
- {
- mSignal = new LLCondition(NULL);
- }
- else
- {
- mSignal = NULL;
- }
-
mCurlMultiHandle = curl_multi_init();
if (!mCurlMultiHandle)
{
@@ -545,22 +528,20 @@ LLCurl::Multi::Multi()
mCurlMultiHandle = curl_multi_init();
}
- llassert_always(mCurlMultiHandle);
- ++gCurlMultiCount;
-}
-
-LLCurl::Multi::~Multi()
-{
- llassert(isStopped());
+ llassert_always(mCurlMultiHandle);
- if (LLCurl::sMultiThreaded)
+ if(LLCurl::getCurlThread()->getThreaded())
{
- LLCurl::Easy::sMultiMutex->lock();
+ mMutexp = new LLMutex(NULL) ;
+ mDeletionMutexp = new LLMutex(NULL) ;
}
+ LLCurl::getCurlThread()->addMulti(this) ;
- delete mSignal;
- mSignal = NULL;
+ ++gCurlMultiCount;
+}
+LLCurl::Multi::~Multi()
+{
// Clean up active
for(easy_active_list_t::iterator iter = mEasyActiveList.begin();
iter != mEasyActiveList.end(); ++iter)
@@ -577,75 +558,149 @@ LLCurl::Multi::~Multi()
mEasyFreeList.clear();
check_curl_multi_code(curl_multi_cleanup(mCurlMultiHandle));
+
+ delete mMutexp ;
+ mMutexp = NULL ;
+ delete mDeletionMutexp ;
+ mDeletionMutexp = NULL ;
+
--gCurlMultiCount;
+}
- if (LLCurl::sMultiThreaded)
+void LLCurl::Multi::lock()
+{
+ if(mMutexp)
{
- LLCurl::Easy::sMultiMutex->unlock();
+ mMutexp->lock() ;
}
}
-CURLMsg* LLCurl::Multi::info_read(S32* msgs_in_queue)
+void LLCurl::Multi::unlock()
{
- CURLMsg* curlmsg = curl_multi_info_read(mCurlMultiHandle, msgs_in_queue);
- return curlmsg;
+ if(mMutexp)
+ {
+ mMutexp->unlock() ;
+ }
}
-void LLCurl::Multi::perform()
+void LLCurl::Multi::markDead()
{
- if (mThreaded)
+ if(mDeletionMutexp)
{
- if (mPerformState == PERFORM_STATE_READY)
- {
- mSignal->signal();
- }
+ mDeletionMutexp->lock() ;
}
- else
+
+ mDead = TRUE ;
+
+ if(mDeletionMutexp)
+ {
+ mDeletionMutexp->unlock() ;
+ }
+}
+
+void LLCurl::Multi::setState(LLCurl::Multi::ePerformState state)
+{
+ lock() ;
+ mState = state ;
+ if(mState == STATE_READY)
{
- doPerform();
+ LLCurl::getCurlThread()->setPriority(mHandle, LLQueuedThread::PRIORITY_NORMAL) ;
}
+ unlock() ;
}
-void LLCurl::Multi::run()
+LLCurl::Multi::ePerformState LLCurl::Multi::getState()
{
- llassert(mThreaded);
+ ePerformState state ;
+
+ lock() ;
+ state = mState ;
+ unlock() ;
+
+ return state ;
+}
+
+bool LLCurl::Multi::isCompleted()
+{
+ return STATE_COMPLETED == getState() ;
+}
- while (!mQuitting)
+bool LLCurl::Multi::waitToComplete()
+{
+ if(!mMutexp) //not threaded
{
- mSignal->wait();
- mPerformState = PERFORM_STATE_PERFORMING;
- if (!mQuitting)
- {
- LLMutexLock lock(LLCurl::Easy::sMultiMutex);
- doPerform();
- }
+ doPerform() ;
+ return true ;
+ }
+
+ bool completed ;
+
+ lock() ;
+ completed = (STATE_COMPLETED == mState) ;
+ if(!completed)
+ {
+ LLCurl::getCurlThread()->setPriority(mHandle, LLQueuedThread::PRIORITY_URGENT) ;
}
+ unlock() ;
+
+ return completed;
}
-void LLCurl::Multi::doPerform()
+CURLMsg* LLCurl::Multi::info_read(S32* msgs_in_queue)
{
- S32 q = 0;
- for (S32 call_count = 0;
- call_count < MULTI_PERFORM_CALL_REPEAT;
- call_count += 1)
+ CURLMsg* curlmsg = curl_multi_info_read(mCurlMultiHandle, msgs_in_queue);
+ return curlmsg;
+}
+
+//return true if dead
+bool LLCurl::Multi::doPerform()
+{
+ if(mDeletionMutexp)
+ {
+ mDeletionMutexp->lock() ;
+ }
+ bool dead = mDead ;
+
+ if(mDead)
{
- CURLMcode code = curl_multi_perform(mCurlMultiHandle, &q);
- if (CURLM_CALL_MULTI_PERFORM != code || q == 0)
+ setState(STATE_COMPLETED);
+ mQueued = 0 ;
+ }
+ else if(getState() != STATE_COMPLETED)
+ {
+ setState(STATE_PERFORMING);
+
+ S32 q = 0;
+ for (S32 call_count = 0;
+ call_count < MULTI_PERFORM_CALL_REPEAT;
+ call_count++)
{
- check_curl_multi_code(code);
- break;
+ CURLMcode code = curl_multi_perform(mCurlMultiHandle, &q);
+ if (CURLM_CALL_MULTI_PERFORM != code || q == 0)
+ {
+ check_curl_multi_code(code);
+
+ break;
+ }
}
-
+
+ mQueued = q;
+ setState(STATE_COMPLETED) ;
}
- mQueued = q;
- mPerformState = PERFORM_STATE_COMPLETED;
+
+ if(mDeletionMutexp)
+ {
+ mDeletionMutexp->unlock() ;
+ }
+
+ return dead ;
}
S32 LLCurl::Multi::process()
{
- perform();
+ waitToComplete() ;
- if (mPerformState != PERFORM_STATE_COMPLETED)
+ if (getState() != STATE_COMPLETED)
{
return 0;
}
@@ -681,7 +736,8 @@ S32 LLCurl::Multi::process()
}
}
- mPerformState = PERFORM_STATE_READY;
+ setState(STATE_READY);
+
return processed;
}
@@ -739,6 +795,75 @@ void LLCurl::Multi::removeEasy(Easy* easy)
easyFree(easy);
}
+//------------------------------------------------------------
+//LLCurlThread
+LLCurlThread::CurlRequest::CurlRequest(handle_t handle, LLCurl::Multi* multi) :
+ LLQueuedThread::QueuedRequest(handle, LLQueuedThread::PRIORITY_NORMAL, FLAG_AUTO_COMPLETE),
+ mMulti(multi)
+{
+}
+
+LLCurlThread::CurlRequest::~CurlRequest()
+{
+ if(mMulti)
+ {
+ delete mMulti ;
+ mMulti = NULL ;
+ }
+}
+
+bool LLCurlThread::CurlRequest::processRequest()
+{
+ bool completed = true ;
+ if(mMulti)
+ {
+ completed = mMulti->doPerform() ;
+ setPriority(LLQueuedThread::PRIORITY_LOW) ;
+ }
+
+ return completed ;
+}
+
+void LLCurlThread::CurlRequest::finishRequest(bool completed)
+{
+ delete mMulti ;
+ mMulti = NULL ;
+}
+
+LLCurlThread::LLCurlThread(bool threaded) :
+ LLQueuedThread("curlthread", threaded)
+{
+}
+
+//virtual
+LLCurlThread::~LLCurlThread()
+{
+}
+
+S32 LLCurlThread::update(U32 max_time_ms)
+{
+ return LLQueuedThread::update(max_time_ms);
+}
+
+void LLCurlThread::addMulti(LLCurl::Multi* multi)
+{
+ multi->mHandle = generateHandle() ;
+
+ CurlRequest* req = new CurlRequest(multi->mHandle, multi) ;
+
+ if (!addRequest(req))
+ {
+ llwarns << "curl request added when the thread is quitted" << llendl;
+ }
+}
+
+void LLCurlThread::deleteMulti(LLCurl::Multi* multi)
+{
+ multi->markDead() ;
+}
+
+//------------------------------------------------------------
+
//static
std::string LLCurl::strerror(CURLcode errorcode)
{
@@ -753,39 +878,23 @@ LLCurlRequest::LLCurlRequest() :
mActiveMulti(NULL),
mActiveRequestCount(0)
{
- mThreadID = LLThread::currentID();
mProcessing = FALSE;
}
LLCurlRequest::~LLCurlRequest()
{
- llassert_always(mThreadID == LLThread::currentID());
-
//stop all Multi handle background threads
for (curlmulti_set_t::iterator iter = mMultiSet.begin(); iter != mMultiSet.end(); ++iter)
{
- LLCurl::Multi* multi = *iter;
- multi->mQuitting = true;
- if (multi->mThreaded)
- {
- while (!multi->isStopped())
- {
- multi->mSignal->signal();
- apr_sleep(1000);
- }
- }
+ LLCurl::getCurlThread()->deleteMulti(*iter) ;
}
- for_each(mMultiSet.begin(), mMultiSet.end(), DeletePointer());
+ mMultiSet.clear() ;
}
void LLCurlRequest::addMulti()
{
- llassert_always(mThreadID == LLThread::currentID());
LLCurl::Multi* multi = new LLCurl::Multi();
- if (multi->mThreaded)
- {
- multi->start();
- }
+
mMultiSet.insert(multi);
mActiveMulti = multi;
mActiveRequestCount = 0;
@@ -901,7 +1010,6 @@ bool LLCurlRequest::post(const std::string& url,
// Note: call once per frame
S32 LLCurlRequest::process()
{
- llassert_always(mThreadID == LLThread::currentID());
S32 res = 0;
mProcessing = TRUE;
@@ -915,17 +1023,7 @@ S32 LLCurlRequest::process()
if (multi != mActiveMulti && tres == 0 && multi->mQueued == 0)
{
mMultiSet.erase(curiter);
- multi->mQuitting = true;
- if (multi->mThreaded)
- {
- while (!multi->isStopped())
- {
- multi->mSignal->signal();
- apr_sleep(1000);
- }
- }
-
- delete multi;
+ LLCurl::getCurlThread()->deleteMulti(multi);
}
}
mProcessing = FALSE;
@@ -934,7 +1032,6 @@ S32 LLCurlRequest::process()
S32 LLCurlRequest::getQueued()
{
- llassert_always(mThreadID == LLThread::currentID());
S32 queued = 0;
for (curlmulti_set_t::iterator iter = mMultiSet.begin();
iter != mMultiSet.end(); )
@@ -942,7 +1039,7 @@ S32 LLCurlRequest::getQueued()
curlmulti_set_t::iterator curiter = iter++;
LLCurl::Multi* multi = *curiter;
queued += multi->mQueued;
- if (multi->mPerformState != LLCurl::Multi::PERFORM_STATE_READY)
+ if (multi->getState() != LLCurl::Multi::STATE_READY)
{
++queued;
}
@@ -959,10 +1056,7 @@ LLCurlEasyRequest::LLCurlEasyRequest()
mResultReturned(false)
{
mMulti = new LLCurl::Multi();
- if (mMulti->mThreaded)
- {
- mMulti->start();
- }
+
mEasy = mMulti->allocEasy();
if (mEasy)
{
@@ -975,16 +1069,7 @@ LLCurlEasyRequest::LLCurlEasyRequest()
LLCurlEasyRequest::~LLCurlEasyRequest()
{
- mMulti->mQuitting = true;
- if (mMulti->mThreaded)
- {
- while (!mMulti->isStopped())
- {
- mMulti->mSignal->signal();
- apr_sleep(1000);
- }
- }
- delete mMulti;
+ LLCurl::getCurlThread()->deleteMulti(mMulti) ;
}
void LLCurlEasyRequest::setopt(CURLoption option, S32 value)
@@ -1080,19 +1165,14 @@ void LLCurlEasyRequest::requestComplete()
}
}
-void LLCurlEasyRequest::perform()
-{
- mMulti->perform();
-}
-
// Usage: Call getRestult until it returns false (no more messages)
bool LLCurlEasyRequest::getResult(CURLcode* result, LLCurl::TransferInfo* info)
{
- if (mMulti->mPerformState != LLCurl::Multi::PERFORM_STATE_COMPLETED)
+ if (!mMulti->isCompleted())
{ //we're busy, try again later
return false;
}
- mMulti->mPerformState = LLCurl::Multi::PERFORM_STATE_READY;
+ mMulti->setState(LLCurl::Multi::STATE_READY) ;
if (!mEasy)
{
@@ -1180,8 +1260,6 @@ unsigned long LLCurl::ssl_thread_id(void)
void LLCurl::initClass(bool multi_threaded)
{
- sMainThreadID = LLThread::currentID();
- sMultiThreaded = multi_threaded;
// Do not change this "unless you are familiar with and mean to control
// internal operations of libcurl"
// - http://curl.haxx.se/libcurl/c/curl_global_init.html
@@ -1189,9 +1267,6 @@ void LLCurl::initClass(bool multi_threaded)
check_curl_code(code);
- Easy::sHandleMutex = new LLMutex(NULL);
- Easy::sMultiMutex = new LLMutex(NULL);
-
#if SAFE_SSL
S32 mutex_count = CRYPTO_num_locks();
for (S32 i=0; i<mutex_count; i++)
@@ -1201,20 +1276,29 @@ void LLCurl::initClass(bool multi_threaded)
CRYPTO_set_id_callback(&LLCurl::ssl_thread_id);
CRYPTO_set_locking_callback(&LLCurl::ssl_locking_callback);
#endif
+
+ sCurlThread = new LLCurlThread(multi_threaded) ;
}
void LLCurl::cleanupClass()
{
+ //shut down curl thread
+ while(1)
+ {
+ if(!sCurlThread->update(1)) //finish all tasks
+ {
+ break ;
+ }
+ }
+ sCurlThread->shutdown() ;
+ delete sCurlThread ;
+ sCurlThread = NULL ;
+
#if SAFE_SSL
CRYPTO_set_locking_callback(NULL);
for_each(sSSLMutex.begin(), sSSLMutex.end(), DeletePointer());
#endif
- delete Easy::sHandleMutex;
- Easy::sHandleMutex = NULL;
- delete Easy::sMultiMutex;
- Easy::sMultiMutex = NULL;
-
for (std::set<CURL*>::iterator iter = Easy::sFreeHandles.begin(); iter != Easy::sFreeHandles.end(); ++iter)
{
CURL* curl = *iter;