diff options
Diffstat (limited to 'indra/llmessage/llcurl.cpp')
-rw-r--r-- | indra/llmessage/llcurl.cpp | 433 |
1 files changed, 284 insertions, 149 deletions
diff --git a/indra/llmessage/llcurl.cpp b/indra/llmessage/llcurl.cpp index 330028c926..f569630766 100644 --- a/indra/llmessage/llcurl.cpp +++ b/indra/llmessage/llcurl.cpp @@ -86,9 +86,7 @@ S32 gCurlMultiCount = 0; std::vector<LLMutex*> LLCurl::sSSLMutex; std::string LLCurl::sCAPath; std::string LLCurl::sCAFile; - -bool LLCurl::sMultiThreaded = false; -static U32 sMainThreadID = 0; +LLCurlThread* LLCurl::sCurlThread = NULL ; void check_curl_code(CURLcode code) { @@ -221,14 +219,15 @@ namespace boost std::set<CURL*> LLCurl::Easy::sFreeHandles; std::set<CURL*> LLCurl::Easy::sActiveHandles; -LLMutex* LLCurl::Easy::sHandleMutex = NULL; -LLMutex* LLCurl::Easy::sMultiMutex = NULL; +LLMutex* LLCurl::Easy::sHandleMutexp = NULL ; //static CURL* LLCurl::Easy::allocEasyHandle() { CURL* ret = NULL; - LLMutexLock lock(sHandleMutex); + + LLMutexLock lock(sHandleMutexp) ; + if (sFreeHandles.empty()) { ret = curl_easy_init(); @@ -256,8 +255,7 @@ void LLCurl::Easy::releaseEasyHandle(CURL* handle) llerrs << "handle cannot be NULL!" << llendl; } - LLMutexLock lock(sHandleMutex); - + LLMutexLock lock(sHandleMutexp) ; if (sActiveHandles.find(handle) != sActiveHandles.end()) { sActiveHandles.erase(handle); @@ -519,48 +517,38 @@ void LLCurl::Easy::prepRequest(const std::string& url, } //////////////////////////////////////////////////////////////////////////// - +LLMutex* LLCurl::Multi::sMultiInitMutexp = NULL ; LLCurl::Multi::Multi() - : LLThread("Curl Multi"), - mQueued(0), + : mQueued(0), mErrorCount(0), - mPerformState(PERFORM_STATE_READY) + mState(STATE_READY), + mDead(FALSE), + mMutexp(NULL), + mDeletionMutexp(NULL), + mEasyMutexp(NULL) { - mQuitting = false; - - mThreaded = LLCurl::sMultiThreaded && LLThread::currentID() == sMainThreadID; - if (mThreaded) - { - mSignal = new LLCondition(NULL); - } - else - { - mSignal = NULL; - } - - mCurlMultiHandle = curl_multi_init(); + mCurlMultiHandle = initMulti(); if (!mCurlMultiHandle) { llwarns << "curl_multi_init() returned NULL! Easy handles: " << gCurlEasyCount << " Multi handles: " << gCurlMultiCount << llendl; - mCurlMultiHandle = curl_multi_init(); + mCurlMultiHandle = initMulti(); } - llassert_always(mCurlMultiHandle); - ++gCurlMultiCount; -} - -LLCurl::Multi::~Multi() -{ - llassert(isStopped()); + llassert_always(mCurlMultiHandle); - if (LLCurl::sMultiThreaded) + if(LLCurl::getCurlThread()->getThreaded()) { - LLCurl::Easy::sMultiMutex->lock(); + mMutexp = new LLMutex(NULL) ; + mDeletionMutexp = new LLMutex(NULL) ; + mEasyMutexp = new LLMutex(NULL) ; } + LLCurl::getCurlThread()->addMulti(this) ; - delete mSignal; - mSignal = NULL; + ++gCurlMultiCount; +} +LLCurl::Multi::~Multi() +{ // Clean up active for(easy_active_list_t::iterator iter = mEasyActiveList.begin(); iter != mEasyActiveList.end(); ++iter) @@ -577,75 +565,137 @@ LLCurl::Multi::~Multi() mEasyFreeList.clear(); check_curl_multi_code(curl_multi_cleanup(mCurlMultiHandle)); + + delete mMutexp ; + mMutexp = NULL ; + delete mDeletionMutexp ; + mDeletionMutexp = NULL ; + delete mEasyMutexp ; + mEasyMutexp = NULL ; + --gCurlMultiCount; +} + +CURLM* LLCurl::Multi::initMulti() +{ + LLMutexLock lock(sMultiInitMutexp) ; + + return curl_multi_init() ; +} - if (LLCurl::sMultiThreaded) +void LLCurl::Multi::lock() +{ + if(mMutexp) { - LLCurl::Easy::sMultiMutex->unlock(); + mMutexp->lock() ; } } -CURLMsg* LLCurl::Multi::info_read(S32* msgs_in_queue) +void LLCurl::Multi::unlock() { - CURLMsg* curlmsg = curl_multi_info_read(mCurlMultiHandle, msgs_in_queue); - return curlmsg; + if(mMutexp) + { + mMutexp->unlock() ; + } +} + +void LLCurl::Multi::markDead() +{ + LLMutexLock lock(mDeletionMutexp) ; + + mDead = TRUE ; } -void LLCurl::Multi::perform() +void LLCurl::Multi::setState(LLCurl::Multi::ePerformState state) { - if (mThreaded) + lock() ; + mState = state ; + unlock() ; + + if(mState == STATE_READY) { - if (mPerformState == PERFORM_STATE_READY) - { - mSignal->signal(); - } + LLCurl::getCurlThread()->setPriority(mHandle, LLQueuedThread::PRIORITY_NORMAL) ; + } +} + +LLCurl::Multi::ePerformState LLCurl::Multi::getState() +{ + return mState; +} + +bool LLCurl::Multi::isCompleted() +{ + return STATE_COMPLETED == getState() ; +} + +bool LLCurl::Multi::waitToComplete() +{ + if(!mMutexp) //not threaded + { + doPerform() ; + return true ; } - else + + bool completed = (STATE_COMPLETED == mState) ; + if(!completed) { - doPerform(); + LLCurl::getCurlThread()->setPriority(mHandle, LLQueuedThread::PRIORITY_URGENT) ; } + + return completed; } -void LLCurl::Multi::run() +CURLMsg* LLCurl::Multi::info_read(S32* msgs_in_queue) { - llassert(mThreaded); + LLMutexLock lock(mMutexp) ; - while (!mQuitting) - { - mSignal->wait(); - mPerformState = PERFORM_STATE_PERFORMING; - if (!mQuitting) - { - LLMutexLock lock(LLCurl::Easy::sMultiMutex); - doPerform(); - } - } + CURLMsg* curlmsg = curl_multi_info_read(mCurlMultiHandle, msgs_in_queue); + return curlmsg; } -void LLCurl::Multi::doPerform() +//return true if dead +bool LLCurl::Multi::doPerform() { - S32 q = 0; - for (S32 call_count = 0; - call_count < MULTI_PERFORM_CALL_REPEAT; - call_count += 1) + LLMutexLock lock(mDeletionMutexp) ; + + bool dead = mDead ; + + if(mDead) { - CURLMcode code = curl_multi_perform(mCurlMultiHandle, &q); - if (CURLM_CALL_MULTI_PERFORM != code || q == 0) + setState(STATE_COMPLETED); + mQueued = 0 ; + } + else if(getState() != STATE_COMPLETED) + { + setState(STATE_PERFORMING); + + S32 q = 0; + for (S32 call_count = 0; + call_count < MULTI_PERFORM_CALL_REPEAT; + call_count++) { - check_curl_multi_code(code); - break; + LLMutexLock lock(mMutexp) ; + CURLMcode code = curl_multi_perform(mCurlMultiHandle, &q); + if (CURLM_CALL_MULTI_PERFORM != code || q == 0) + { + check_curl_multi_code(code); + + break; + } } - + + mQueued = q; + setState(STATE_COMPLETED) ; } - mQueued = q; - mPerformState = PERFORM_STATE_COMPLETED; + + return dead ; } S32 LLCurl::Multi::process() { - perform(); + waitToComplete() ; - if (mPerformState != PERFORM_STATE_COMPLETED) + if (getState() != STATE_COMPLETED) { return 0; } @@ -660,10 +710,19 @@ S32 LLCurl::Multi::process() if (msg->msg == CURLMSG_DONE) { U32 response = 0; - easy_active_map_t::iterator iter = mEasyActiveMap.find(msg->easy_handle); - if (iter != mEasyActiveMap.end()) + Easy* easy = NULL ; + + { + LLMutexLock lock(mEasyMutexp) ; + easy_active_map_t::iterator iter = mEasyActiveMap.find(msg->easy_handle); + if (iter != mEasyActiveMap.end()) + { + easy = iter->second; + } + } + + if(easy) { - Easy* easy = iter->second; response = easy->report(msg->data.result); removeEasy(easy); } @@ -681,25 +740,28 @@ S32 LLCurl::Multi::process() } } - mPerformState = PERFORM_STATE_READY; + setState(STATE_READY); + return processed; } LLCurl::Easy* LLCurl::Multi::allocEasy() { - Easy* easy = 0; + Easy* easy = 0; if (mEasyFreeList.empty()) - { + { easy = Easy::getEasy(); } else { + LLMutexLock lock(mEasyMutexp) ; easy = *(mEasyFreeList.begin()); mEasyFreeList.erase(easy); } if (easy) { + LLMutexLock lock(mEasyMutexp) ; mEasyActiveList.insert(easy); mEasyActiveMap[easy->getCurlHandle()] = easy; } @@ -708,6 +770,7 @@ LLCurl::Easy* LLCurl::Multi::allocEasy() bool LLCurl::Multi::addEasy(Easy* easy) { + LLMutexLock lock(mMutexp) ; CURLMcode mcode = curl_multi_add_handle(mCurlMultiHandle, easy->getCurlHandle()); check_curl_multi_code(mcode); //if (mcode != CURLM_OK) @@ -720,25 +783,131 @@ bool LLCurl::Multi::addEasy(Easy* easy) void LLCurl::Multi::easyFree(Easy* easy) { + if(mEasyMutexp) + { + mEasyMutexp->lock() ; + } + mEasyActiveList.erase(easy); mEasyActiveMap.erase(easy->getCurlHandle()); + if (mEasyFreeList.size() < EASY_HANDLE_POOL_SIZE) - { - easy->resetState(); + { mEasyFreeList.insert(easy); + + if(mEasyMutexp) + { + mEasyMutexp->unlock() ; + } + + easy->resetState(); } else { + if(mEasyMutexp) + { + mEasyMutexp->unlock() ; + } delete easy; } } void LLCurl::Multi::removeEasy(Easy* easy) { - check_curl_multi_code(curl_multi_remove_handle(mCurlMultiHandle, easy->getCurlHandle())); + { + LLMutexLock lock(mMutexp) ; + check_curl_multi_code(curl_multi_remove_handle(mCurlMultiHandle, easy->getCurlHandle())); + } easyFree(easy); } +//------------------------------------------------------------ +//LLCurlThread +LLCurlThread::CurlRequest::CurlRequest(handle_t handle, LLCurl::Multi* multi, LLCurlThread* curl_thread) : + LLQueuedThread::QueuedRequest(handle, LLQueuedThread::PRIORITY_NORMAL, FLAG_AUTO_COMPLETE), + mMulti(multi), + mCurlThread(curl_thread) +{ +} + +LLCurlThread::CurlRequest::~CurlRequest() +{ + if(mMulti) + { + mCurlThread->deleteMulti(mMulti) ; + mMulti = NULL ; + } +} + +bool LLCurlThread::CurlRequest::processRequest() +{ + bool completed = true ; + if(mMulti) + { + completed = mCurlThread->doMultiPerform(mMulti) ; + setPriority(LLQueuedThread::PRIORITY_LOW) ; + } + + return completed ; +} + +void LLCurlThread::CurlRequest::finishRequest(bool completed) +{ + mCurlThread->deleteMulti(mMulti) ; + mMulti = NULL ; +} + +LLCurlThread::LLCurlThread(bool threaded) : + LLQueuedThread("curlthread", threaded) +{ + if(!LLCurl::Multi::sMultiInitMutexp) + { + LLCurl::Multi::sMultiInitMutexp = new LLMutex(NULL) ; + } +} + +//virtual +LLCurlThread::~LLCurlThread() +{ + delete LLCurl::Multi::sMultiInitMutexp ; + LLCurl::Multi::sMultiInitMutexp = NULL ; +} + +S32 LLCurlThread::update(U32 max_time_ms) +{ + return LLQueuedThread::update(max_time_ms); +} + +void LLCurlThread::addMulti(LLCurl::Multi* multi) +{ + multi->mHandle = generateHandle() ; + + CurlRequest* req = new CurlRequest(multi->mHandle, multi, this) ; + + if (!addRequest(req)) + { + llwarns << "curl request added when the thread is quitted" << llendl; + } +} + +void LLCurlThread::killMulti(LLCurl::Multi* multi) +{ + multi->markDead() ; +} + +//private +bool LLCurlThread::doMultiPerform(LLCurl::Multi* multi) +{ + return multi->doPerform() ; +} + +//private +void LLCurlThread::deleteMulti(LLCurl::Multi* multi) +{ + delete multi ; +} +//------------------------------------------------------------ + //static std::string LLCurl::strerror(CURLcode errorcode) { @@ -753,39 +922,23 @@ LLCurlRequest::LLCurlRequest() : mActiveMulti(NULL), mActiveRequestCount(0) { - mThreadID = LLThread::currentID(); mProcessing = FALSE; } LLCurlRequest::~LLCurlRequest() { - llassert_always(mThreadID == LLThread::currentID()); - //stop all Multi handle background threads for (curlmulti_set_t::iterator iter = mMultiSet.begin(); iter != mMultiSet.end(); ++iter) { - LLCurl::Multi* multi = *iter; - multi->mQuitting = true; - if (multi->mThreaded) - { - while (!multi->isStopped()) - { - multi->mSignal->signal(); - apr_sleep(1000); - } - } + LLCurl::getCurlThread()->killMulti(*iter) ; } - for_each(mMultiSet.begin(), mMultiSet.end(), DeletePointer()); + mMultiSet.clear() ; } void LLCurlRequest::addMulti() { - llassert_always(mThreadID == LLThread::currentID()); LLCurl::Multi* multi = new LLCurl::Multi(); - if (multi->mThreaded) - { - multi->start(); - } + mMultiSet.insert(multi); mActiveMulti = multi; mActiveRequestCount = 0; @@ -901,7 +1054,6 @@ bool LLCurlRequest::post(const std::string& url, // Note: call once per frame S32 LLCurlRequest::process() { - llassert_always(mThreadID == LLThread::currentID()); S32 res = 0; mProcessing = TRUE; @@ -915,17 +1067,7 @@ S32 LLCurlRequest::process() if (multi != mActiveMulti && tres == 0 && multi->mQueued == 0) { mMultiSet.erase(curiter); - multi->mQuitting = true; - if (multi->mThreaded) - { - while (!multi->isStopped()) - { - multi->mSignal->signal(); - apr_sleep(1000); - } - } - - delete multi; + LLCurl::getCurlThread()->killMulti(multi); } } mProcessing = FALSE; @@ -934,7 +1076,6 @@ S32 LLCurlRequest::process() S32 LLCurlRequest::getQueued() { - llassert_always(mThreadID == LLThread::currentID()); S32 queued = 0; for (curlmulti_set_t::iterator iter = mMultiSet.begin(); iter != mMultiSet.end(); ) @@ -942,7 +1083,7 @@ S32 LLCurlRequest::getQueued() curlmulti_set_t::iterator curiter = iter++; LLCurl::Multi* multi = *curiter; queued += multi->mQueued; - if (multi->mPerformState != LLCurl::Multi::PERFORM_STATE_READY) + if (multi->getState() != LLCurl::Multi::STATE_READY) { ++queued; } @@ -959,10 +1100,7 @@ LLCurlEasyRequest::LLCurlEasyRequest() mResultReturned(false) { mMulti = new LLCurl::Multi(); - if (mMulti->mThreaded) - { - mMulti->start(); - } + mEasy = mMulti->allocEasy(); if (mEasy) { @@ -975,16 +1113,7 @@ LLCurlEasyRequest::LLCurlEasyRequest() LLCurlEasyRequest::~LLCurlEasyRequest() { - mMulti->mQuitting = true; - if (mMulti->mThreaded) - { - while (!mMulti->isStopped()) - { - mMulti->mSignal->signal(); - apr_sleep(1000); - } - } - delete mMulti; + LLCurl::getCurlThread()->killMulti(mMulti) ; } void LLCurlEasyRequest::setopt(CURLoption option, S32 value) @@ -1080,19 +1209,14 @@ void LLCurlEasyRequest::requestComplete() } } -void LLCurlEasyRequest::perform() -{ - mMulti->perform(); -} - // Usage: Call getRestult until it returns false (no more messages) bool LLCurlEasyRequest::getResult(CURLcode* result, LLCurl::TransferInfo* info) { - if (mMulti->mPerformState != LLCurl::Multi::PERFORM_STATE_COMPLETED) + if (!mMulti->isCompleted()) { //we're busy, try again later return false; } - mMulti->mPerformState = LLCurl::Multi::PERFORM_STATE_READY; + mMulti->setState(LLCurl::Multi::STATE_READY) ; if (!mEasy) { @@ -1180,8 +1304,6 @@ unsigned long LLCurl::ssl_thread_id(void) void LLCurl::initClass(bool multi_threaded) { - sMainThreadID = LLThread::currentID(); - sMultiThreaded = multi_threaded; // Do not change this "unless you are familiar with and mean to control // internal operations of libcurl" // - http://curl.haxx.se/libcurl/c/curl_global_init.html @@ -1189,9 +1311,6 @@ void LLCurl::initClass(bool multi_threaded) check_curl_code(code); - Easy::sHandleMutex = new LLMutex(NULL); - Easy::sMultiMutex = new LLMutex(NULL); - #if SAFE_SSL S32 mutex_count = CRYPTO_num_locks(); for (S32 i=0; i<mutex_count; i++) @@ -1201,20 +1320,33 @@ void LLCurl::initClass(bool multi_threaded) CRYPTO_set_id_callback(&LLCurl::ssl_thread_id); CRYPTO_set_locking_callback(&LLCurl::ssl_locking_callback); #endif + + sCurlThread = new LLCurlThread(multi_threaded) ; + if(multi_threaded) + { + Easy::sHandleMutexp = new LLMutex(NULL) ; + } } void LLCurl::cleanupClass() { + //shut down curl thread + while(1) + { + if(!sCurlThread->update(1)) //finish all tasks + { + break ; + } + } + sCurlThread->shutdown() ; + delete sCurlThread ; + sCurlThread = NULL ; + #if SAFE_SSL CRYPTO_set_locking_callback(NULL); for_each(sSSLMutex.begin(), sSSLMutex.end(), DeletePointer()); #endif - delete Easy::sHandleMutex; - Easy::sHandleMutex = NULL; - delete Easy::sMultiMutex; - Easy::sMultiMutex = NULL; - for (std::set<CURL*>::iterator iter = Easy::sFreeHandles.begin(); iter != Easy::sFreeHandles.end(); ++iter) { CURL* curl = *iter; @@ -1223,6 +1355,9 @@ void LLCurl::cleanupClass() Easy::sFreeHandles.clear(); + delete Easy::sHandleMutexp ; + Easy::sHandleMutexp = NULL ; + llassert(Easy::sActiveHandles.empty()); } |