/** 
 * @file llqueuedthread.cpp
 *
 * $LicenseInfo:firstyear=2004&license=viewerlgpl$
 * Second Life Viewer Source Code
 * Copyright (C) 2010, Linden Research, Inc.
 * 
 * This library is free software; you can redistribute it and/or
 * modify it under the terms of the GNU Lesser General Public
 * License as published by the Free Software Foundation;
 * version 2.1 of the License only.
 * 
 * This library is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
 * Lesser General Public License for more details.
 * 
 * You should have received a copy of the GNU Lesser General Public
 * License along with this library; if not, write to the Free Software
 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301  USA
 * 
 * Linden Research, Inc., 945 Battery Street, San Francisco, CA  94111  USA
 * $/LicenseInfo$
 */

#include "linden_common.h"
#include "llqueuedthread.h"

#include <chrono>

#include "llstl.h"
#include "lltimer.h"	// ms_sleep()
#include "llmutex.h"

//============================================================================

// MAIN THREAD
LLQueuedThread::LLQueuedThread(const std::string& name, bool threaded, bool should_pause) :
    LLThread(name),
    mIdleThread(TRUE),
    mNextHandle(0),
    mStarted(FALSE),
    mThreaded(threaded),
    mRequestQueue(name, 1024 * 1024)
{
    llassert(threaded); // not threaded implementation is deprecated
    mMainQueue = LL::WorkQueue::getInstance("mainloop");

	if (mThreaded)
	{
		if(should_pause)
		{
			pause() ; //call this before start the thread.
		}

		start();
	}
}

// MAIN THREAD
LLQueuedThread::~LLQueuedThread()
{
	if (!mThreaded)
	{
		endThread();
	}
	shutdown();
	// ~LLThread() will be called here
}

void LLQueuedThread::shutdown()
{
	setQuitting();

	unpause(); // MAIN THREAD
	if (mThreaded)
	{
        if (mRequestQueue.size() == 0)
        {
            mRequestQueue.close();
        }

		S32 timeout = 100;
		for ( ; timeout>0; timeout--)
		{
			if (isStopped())
			{
				break;
			}
			ms_sleep(100);
			LLThread::yield();
		}
		if (timeout == 0)
		{
			LL_WARNS() << "~LLQueuedThread (" << mName << ") timed out!" << LL_ENDL;
		}
	}
	else
	{
		mStatus = STOPPED;
	}

	QueuedRequest* req;
	S32 active_count = 0;
	while ( (req = (QueuedRequest*)mRequestHash.pop_element()) )
	{
		if (req->getStatus() == STATUS_QUEUED || req->getStatus() == STATUS_INPROGRESS)
		{
			++active_count;
			req->setStatus(STATUS_ABORTED); // avoid assert in deleteRequest
		}
		req->deleteRequest();
	}
	if (active_count)
	{
		LL_WARNS() << "~LLQueuedThread() called with active requests: " << active_count << LL_ENDL;
	}

    mRequestQueue.close();
}

//----------------------------------------------------------------------------

// MAIN THREAD
// virtual
S32 LLQueuedThread::update(F32 max_time_ms)
{
    LL_PROFILE_ZONE_SCOPED;
	if (!mStarted)
	{
		if (!mThreaded)
		{
			startThread();
			mStarted = TRUE;
		}
	}
	return updateQueue(max_time_ms);
}

S32 LLQueuedThread::updateQueue(F32 max_time_ms)
{
    LL_PROFILE_ZONE_SCOPED;
	// Frame Update
	if (mThreaded)
	{
        // schedule a call to threadedUpdate for every call to updateQueue
        if (!isQuitting())
        {
            mRequestQueue.postIfOpen([=]()
                {
                    LL_PROFILE_ZONE_NAMED_CATEGORY_THREAD("qt - update");
                    mIdleThread = FALSE;
                    threadedUpdate();
                    mIdleThread = TRUE;
                }
            );
        }

		if(getPending() > 0)
		{
		    unpause();
	    }
	}
	else
	{
        mRequestQueue.runFor(std::chrono::microseconds((int) (max_time_ms*1000.f)));
        threadedUpdate();
	}
	return getPending();
}

void LLQueuedThread::incQueue()
{
	// Something has been added to the queue
	if (!isPaused())
	{
		if (mThreaded)
		{
			wake(); // Wake the thread up if necessary.
		}
	}
}

//virtual
// May be called from any thread
S32 LLQueuedThread::getPending()
{
	return mRequestQueue.size();
}

// MAIN thread
void LLQueuedThread::waitOnPending()
{
	while(1)
	{
		update(0);

		if (mIdleThread)
		{
			break;
		}
		if (mThreaded)
		{
			yield();
		}
	}
	return;
}

// MAIN thread
void LLQueuedThread::printQueueStats()
{
    U32 size = mRequestQueue.size();
	if (size > 0)
	{
		LL_INFOS() << llformat("Pending Requests:%d ", mRequestQueue.size()) << LL_ENDL;
	}
	else
	{
		LL_INFOS() << "Queued Thread Idle" << LL_ENDL;
	}
}

// MAIN thread
LLQueuedThread::handle_t LLQueuedThread::generateHandle()
{
    U32 res = ++mNextHandle;
	return res;
}

// MAIN thread
bool LLQueuedThread::addRequest(QueuedRequest* req)
{
    LL_PROFILE_ZONE_SCOPED;
	if (mStatus == QUITTING)
	{
		return false;
	}
	
	lockData();
	req->setStatus(STATUS_QUEUED);
    mRequestHash.insert(req);
#if _DEBUG
// 	LL_INFOS() << llformat("LLQueuedThread::Added req [%08d]",handle) << LL_ENDL;
#endif
	unlockData();

    llassert(!mDataLock->isSelfLocked());
    mRequestQueue.post([this, req]() { processRequest(req); });

	return true;
}

// MAIN thread
bool LLQueuedThread::waitForResult(LLQueuedThread::handle_t handle, bool auto_complete)
{
    LL_PROFILE_ZONE_SCOPED;
	llassert (handle != nullHandle());
	bool res = false;
	bool waspaused = isPaused();
	bool done = false;
	while(!done)
	{
		update(0); // unpauses
		lockData();
		QueuedRequest* req = (QueuedRequest*)mRequestHash.find(handle);
		if (!req)
		{
			done = true; // request does not exist
		}
		else if (req->getStatus() == STATUS_COMPLETE)
		{
			res = true;
			if (auto_complete)
			{
				mRequestHash.erase(handle);
				req->deleteRequest();
// 				check();
			}
			done = true;
		}
		unlockData();
		
		if (!done && mThreaded)
		{
			yield();
		}
	}
	if (waspaused)
	{
		pause();
	}
	return res;
}

// MAIN thread
LLQueuedThread::QueuedRequest* LLQueuedThread::getRequest(handle_t handle)
{
	if (handle == nullHandle())
	{
		return 0;
	}
	lockData();
	QueuedRequest* res = (QueuedRequest*)mRequestHash.find(handle);
	unlockData();
	return res;
}

LLQueuedThread::status_t LLQueuedThread::getRequestStatus(handle_t handle)
{
	status_t res = STATUS_EXPIRED;
	lockData();
	QueuedRequest* req = (QueuedRequest*)mRequestHash.find(handle);
	if (req)
	{
		res = req->getStatus();
	}
	unlockData();
	return res;
}

void LLQueuedThread::abortRequest(handle_t handle, bool autocomplete)
{
    LL_PROFILE_ZONE_SCOPED_CATEGORY_THREAD;
	lockData();
	QueuedRequest* req = (QueuedRequest*)mRequestHash.find(handle);
	if (req)
	{
		req->setFlags(FLAG_ABORT | (autocomplete ? FLAG_AUTO_COMPLETE : 0));
	}
	unlockData();
}

// MAIN thread
void LLQueuedThread::setFlags(handle_t handle, U32 flags)
{
	lockData();
	QueuedRequest* req = (QueuedRequest*)mRequestHash.find(handle);
	if (req)
	{
		req->setFlags(flags);
	}
	unlockData();
}

bool LLQueuedThread::completeRequest(handle_t handle)
{
    LL_PROFILE_ZONE_SCOPED;
	bool res = false;
	lockData();
	QueuedRequest* req = (QueuedRequest*)mRequestHash.find(handle);
	if (req)
	{
		llassert_always(req->getStatus() != STATUS_QUEUED);
		llassert_always(req->getStatus() != STATUS_INPROGRESS);
#if _DEBUG
// 		LL_INFOS() << llformat("LLQueuedThread::Completed req [%08d]",handle) << LL_ENDL;
#endif
		mRequestHash.erase(handle);
		req->deleteRequest();
// 		check();
		res = true;
	}
	unlockData();
	return res;
}

bool LLQueuedThread::check()
{
#if 0 // not a reliable check once mNextHandle wraps, just for quick and dirty debugging
	for (int i=0; i<REQUEST_HASH_SIZE; i++)
	{
		LLSimpleHashEntry<handle_t>* entry = mRequestHash.get_element_at_index(i);
		while (entry)
		{
			if (entry->getHashKey() > mNextHandle)
			{
				LL_ERRS() << "Hash Error" << LL_ENDL;
				return false;
			}
			entry = entry->getNextEntry();
		}
	}
#endif
	return true;
}		
	
//============================================================================
// Runs on its OWN thread

void LLQueuedThread::processRequest(LLQueuedThread::QueuedRequest* req)
{
    LL_PROFILE_ZONE_SCOPED_CATEGORY_THREAD;

    mIdleThread = FALSE;
    //threadedUpdate();

	// Get next request from pool
	lockData();
	
	if ((req->getFlags() & FLAG_ABORT) || (mStatus == QUITTING))
	{
        LL_PROFILE_ZONE_NAMED_CATEGORY_THREAD("qtpr - abort");
		req->setStatus(STATUS_ABORTED);
		req->finishRequest(false);
		if (req->getFlags() & FLAG_AUTO_COMPLETE)
		{
			mRequestHash.erase(req);
			req->deleteRequest();
// 				check();
		}
        unlockData();
	}
    else
    {
        llassert_always(req->getStatus() == STATUS_QUEUED);

        if (req)
        {
            req->setStatus(STATUS_INPROGRESS);
        }
        unlockData();

        // This is the only place we will call req->setStatus() after
        // it has initially been seet to STATUS_QUEUED, so it is
        // safe to access req.
        if (req)
        {
            // process request		
            bool complete = req->processRequest();

            if (complete)
            {
                LL_PROFILE_ZONE_NAMED_CATEGORY_THREAD("qtpr - complete");
                lockData();
                req->setStatus(STATUS_COMPLETE);
                req->finishRequest(true);
                if (req->getFlags() & FLAG_AUTO_COMPLETE)
                {
                    mRequestHash.erase(req);
                    req->deleteRequest();
                    // 				check();
                }
                unlockData();
            }
            else
            {
                LL_PROFILE_ZONE_NAMED_CATEGORY_THREAD("qtpr - retry");
                //put back on queue and try again in 0.1ms
                lockData();
                req->setStatus(STATUS_QUEUED);
                
                unlockData();

                llassert(!mDataLock->isSelfLocked());

#if 0
                // try again on next frame
                // NOTE: tried using "post" with a time in the future, but this
                // would invariably cause this thread to wait for a long time (10+ ms)
                // while work is pending
                bool ret = LL::WorkQueue::postMaybe(
                    mMainQueue,
                    [=]()
                    {
                        LL_PROFILE_ZONE_NAMED("processRequest - retry");
                        mRequestQueue.post([=]()
                            {
                                LL_PROFILE_ZONE_NAMED("processRequest - retry"); // <-- not redundant, track retry on both queues
                                processRequest(req);
                            });
                    });
                llassert(ret);
#else
                using namespace std::chrono_literals;
                auto retry_time = LL::WorkQueue::TimePoint::clock::now() + 16ms;
                mRequestQueue.post([=]
                    {
                        LL_PROFILE_ZONE_NAMED("processRequest - retry");
                        if (LL::WorkQueue::TimePoint::clock::now() < retry_time)
                        {
                            auto sleep_time = std::chrono::duration_cast<std::chrono::milliseconds>(retry_time - LL::WorkQueue::TimePoint::clock::now());
                            
                            if (sleep_time.count() > 0) 
                            {
                                ms_sleep(sleep_time.count());
                            }
                        }
                        processRequest(req);
                    });
#endif
                
            }
        }
    }

    mIdleThread = TRUE;
}

// virtual
bool LLQueuedThread::runCondition()
{
	// mRunCondition must be locked here
	if (mRequestQueue.size() == 0 && mIdleThread)
		return false;
	else
		return true;
}

// virtual
void LLQueuedThread::run()
{
	// call checPause() immediately so we don't try to do anything before the class is fully constructed
	checkPause();
	startThread();
	mStarted = TRUE;
	

	/*while (1)
	{
        LL_PROFILE_ZONE_SCOPED;
		// this will block on the condition until runCondition() returns true, the thread is unpaused, or the thread leaves the RUNNING state.
		checkPause();
		
		mIdleThread = FALSE;

		threadedUpdate();
		
		int pending_work = processNextRequest();

		if (pending_work == 0)
		{
            //LL_PROFILE_ZONE_NAMED("LLQueuedThread - sleep");
			mIdleThread = TRUE;
			//ms_sleep(1);
		}
		//LLThread::yield(); // thread should yield after each request		
	}*/
    mRequestQueue.runUntilClose();

    endThread();
	LL_INFOS() << "LLQueuedThread " << mName << " EXITING." << LL_ENDL;

    
}

// virtual
void LLQueuedThread::startThread()
{
}

// virtual
void LLQueuedThread::endThread()
{
}

// virtual
void LLQueuedThread::threadedUpdate()
{
}

//============================================================================

LLQueuedThread::QueuedRequest::QueuedRequest(LLQueuedThread::handle_t handle, U32 flags) :
	LLSimpleHashEntry<LLQueuedThread::handle_t>(handle),
	mStatus(STATUS_UNKNOWN),
	mFlags(flags)
{
}

LLQueuedThread::QueuedRequest::~QueuedRequest()
{
	llassert_always(mStatus == STATUS_DELETE);
}

//virtual
void LLQueuedThread::QueuedRequest::finishRequest(bool completed)
{
}

//virtual
void LLQueuedThread::QueuedRequest::deleteRequest()
{
	llassert_always(mStatus != STATUS_INPROGRESS);
	setStatus(STATUS_DELETE);
	delete this;
}