/**
* @file LLCoprocedurePool.cpp
* @author Rider Linden
* @brief Singleton class for managing asset uploads to the sim.
*
* $LicenseInfo:firstyear=2015&license=viewerlgpl$
* Second Life Viewer Source Code
* Copyright (C) 2015, Linden Research, Inc.
*
* This library is free software; you can redistribute it and/or
* modify it under the terms of the GNU Lesser General Public
* License as published by the Free Software Foundation;
* version 2.1 of the License only.
*
* This library is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
* Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public
* License along with this library; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301  USA
*
* Linden Research, Inc., 945 Battery Street, San Francisco, CA  94111  USA
* $/LicenseInfo$
*/

#include "llwin32headers.h"

#include "linden_common.h"

#include "llcoproceduremanager.h"

#include <chrono>

#include <boost/fiber/buffered_channel.hpp>

#include "llexception.h"
#include "stringize.h"

//=========================================================================
// Map of pool sizes for known pools
static const std::map<std::string, U32> DefaultPoolSizes{
	{std::string("Upload"),  1},
    {std::string("AIS"),     1},
    // *TODO: Rider for the moment keep AIS calls serialized otherwise the COF will tend to get out of sync.
};

static const U32 DEFAULT_POOL_SIZE = 5;
const U32 LLCoprocedureManager::DEFAULT_QUEUE_SIZE = 4096;

//=========================================================================
class LLCoprocedurePool: private boost::noncopyable
{
public:
    typedef LLCoprocedureManager::CoProcedure_t CoProcedure_t;

    LLCoprocedurePool(const std::string &name, size_t size);
    ~LLCoprocedurePool();

    /// Places the coprocedure on the queue for processing. 
    /// 
    /// @param name Is used for debugging and should identify this coroutine.
    /// @param proc Is a bound function to be executed 
    /// 
    /// @return This method returns a UUID that can be used later to cancel execution.
    LLUUID enqueueCoprocedure(const std::string &name, CoProcedure_t proc);

    /// Returns the number of coprocedures in the queue awaiting processing.
    ///
    inline size_t countPending() const
    {
        return mPending;
    }

    /// Returns the number of coprocedures actively being processed.
    ///
    inline size_t countActive() const
    {
        return mActiveCoprocsCount;
    }

    /// Returns the total number of coprocedures either queued or in active processing.
    ///
    inline S32 count() const
    {
        return countPending() + countActive();
    }

    void close();
    
private:
    struct QueuedCoproc
    {
        typedef boost::shared_ptr<QueuedCoproc> ptr_t;

        QueuedCoproc(const std::string &name, const LLUUID &id, CoProcedure_t proc) :
            mName(name),
            mId(id),
            mProc(proc)
        {}

        std::string mName;
        LLUUID mId;
        CoProcedure_t mProc;
    };

    // we use a buffered_channel here rather than unbuffered_channel since we want to be able to 
    // push values without blocking,even if there's currently no one calling a pop operation (due to
    // fiber running right now)
    typedef boost::fibers::buffered_channel<QueuedCoproc::ptr_t>  CoprocQueue_t;
    // Use shared_ptr to control the lifespan of our CoprocQueue_t instance
    // because the consuming coroutine might outlive this LLCoprocedurePool
    // instance.
    typedef boost::shared_ptr<CoprocQueue_t> CoprocQueuePtr;

    std::string     mPoolName;
    size_t          mPoolSize, mActiveCoprocsCount, mPending;
    CoprocQueuePtr  mPendingCoprocs;
    LLTempBoundListener mStatusListener;

    typedef std::map<std::string, LLCoreHttpUtil::HttpCoroutineAdapter::ptr_t> CoroAdapterMap_t;
    LLCore::HttpRequest::policy_t mHTTPPolicy;

    CoroAdapterMap_t mCoroMapping;

    void coprocedureInvokerCoro(CoprocQueuePtr pendingCoprocs,
                                LLCoreHttpUtil::HttpCoroutineAdapter::ptr_t httpAdapter);
};

//=========================================================================
LLCoprocedureManager::LLCoprocedureManager()
{
}

LLCoprocedureManager::~LLCoprocedureManager()
{
    close();
}

LLCoprocedureManager::poolPtr_t LLCoprocedureManager::initializePool(const std::string &poolName)
{
    // Attempt to look up a pool size in the configuration.  If found use that
    std::string keyName = "PoolSize" + poolName;
    int size = 0;

    LL_ERRS_IF(poolName.empty(), "CoprocedureManager") << "Poolname must not be empty" << LL_ENDL;

    if (mPropertyQueryFn)
    {
        size = mPropertyQueryFn(keyName);
    }

    if (size == 0)
    {
        // if not found grab the know default... if there is no known 
        // default use a reasonable number like 5.
        auto it = DefaultPoolSizes.find(poolName);
        size = (it != DefaultPoolSizes.end()) ? it->second : DEFAULT_POOL_SIZE;

        if (mPropertyDefineFn)
        {
            mPropertyDefineFn(keyName, size, "Coroutine Pool size for " + poolName);
        }

        LL_WARNS("CoProcMgr") << "LLCoprocedureManager: No setting for \"" << keyName << "\" setting pool size to default of " << size << LL_ENDL;
    }

    poolPtr_t pool(new LLCoprocedurePool(poolName, size));
    LL_ERRS_IF(!pool, "CoprocedureManager") << "Unable to create pool named \"" << poolName << "\" FATAL!" << LL_ENDL;

    bool inserted = mPoolMap.emplace(poolName, pool).second;
    LL_ERRS_IF(!inserted, "CoprocedureManager") << "Unable to add pool named \"" << poolName << "\" to map. FATAL!" << LL_ENDL;

    return pool;
}

//-------------------------------------------------------------------------
LLUUID LLCoprocedureManager::enqueueCoprocedure(const std::string &pool, const std::string &name, CoProcedure_t proc)
{
    // Attempt to find the pool and enqueue the procedure.  If the pool does 
    // not exist, create it.
    poolMap_t::iterator it = mPoolMap.find(pool);

    poolPtr_t targetPool = (it != mPoolMap.end()) ? it->second : initializePool(pool);

    return targetPool->enqueueCoprocedure(name, proc);
}

void LLCoprocedureManager::setPropertyMethods(SettingQuery_t queryfn, SettingUpdate_t updatefn)
{
    // functions to discover and store the pool sizes
    mPropertyQueryFn = queryfn;
    mPropertyDefineFn = updatefn;

    // workaround until we get mutex into initializePool
    initializePool("AssetStorage");
    initializePool("Upload");
}

//-------------------------------------------------------------------------
size_t LLCoprocedureManager::countPending() const
{
    size_t count = 0;
    for (const auto& pair : mPoolMap)
    {
        count += pair.second->countPending();
    }
    return count;
}

size_t LLCoprocedureManager::countPending(const std::string &pool) const
{
    poolMap_t::const_iterator it = mPoolMap.find(pool);

    if (it == mPoolMap.end())
        return 0;
    return it->second->countPending();
}

size_t LLCoprocedureManager::countActive() const
{
    size_t count = 0;
    for (poolMap_t::const_iterator it = mPoolMap.begin(); it != mPoolMap.end(); ++it)
    {
        count += it->second->countActive();
    }
    return count;
}

size_t LLCoprocedureManager::countActive(const std::string &pool) const
{
    poolMap_t::const_iterator it = mPoolMap.find(pool);

    if (it == mPoolMap.end())
    {
        return 0;
    }
    return it->second->countActive();
}

size_t LLCoprocedureManager::count() const
{
    size_t count = 0;
    for (const auto& pair : mPoolMap)
    {
        count += pair.second->count();
    }
    return count;
}

size_t LLCoprocedureManager::count(const std::string &pool) const
{
    poolMap_t::const_iterator it = mPoolMap.find(pool);

    if (it == mPoolMap.end())
        return 0;
    return it->second->count();
}

void LLCoprocedureManager::close()
{
    for(auto & poolEntry : mPoolMap)
    {
        poolEntry.second->close();
    }
}

void LLCoprocedureManager::close(const std::string &pool)
{
    poolMap_t::iterator it = mPoolMap.find(pool);
    if (it != mPoolMap.end())
    {
        it->second->close();
    }
}

//=========================================================================
LLCoprocedurePool::LLCoprocedurePool(const std::string &poolName, size_t size):
    mPoolName(poolName),
    mPoolSize(size),
    mActiveCoprocsCount(0),
    mPending(0),
    mPendingCoprocs(boost::make_shared<CoprocQueue_t>(LLCoprocedureManager::DEFAULT_QUEUE_SIZE)),
    mHTTPPolicy(LLCore::HttpRequest::DEFAULT_POLICY_ID),
    mCoroMapping()
{
    try
    {
        // store in our LLTempBoundListener so that when the LLCoprocedurePool is
        // destroyed, we implicitly disconnect from this LLEventPump
        // Monitores application status
        mStatusListener = LLEventPumps::instance().obtain("LLApp").listen(
            poolName + "_pool", // Make sure it won't repeat names from lleventcoro
            [pendingCoprocs = mPendingCoprocs, poolName](const LLSD& status)
        {
            auto& statsd = status["status"];
            if (statsd.asString() != "running")
            {
                LL_INFOS("CoProcMgr") << "Pool " << poolName
                                      << " closing queue because status " << statsd
                                      << LL_ENDL;
                // This should ensure that all waiting coprocedures in this
                // pool will wake up and terminate.
                pendingCoprocs->close();
            }
            return false;
        });
    }
    catch (const LLEventPump::DupListenerName &)
    {
        // This shounldn't be possible since LLCoprocedurePool is supposed to have unique names,
        // yet it somehow did happen, as result pools got '_pool' suffix and this catch.
        //
        // If this somehow happens again it is better to crash later on shutdown due to pump
        // not stopping coroutine and see warning in logs than on startup or during login.
        LL_WARNS("CoProcMgr") << "Attempted to register dupplicate listener name: " << poolName
                              << "_pool. Failed to start listener." << LL_ENDL;

        llassert(0); // Fix Me! Ignoring missing listener!
    }

    for (size_t count = 0; count < mPoolSize; ++count)
    {
        LLCoreHttpUtil::HttpCoroutineAdapter::ptr_t httpAdapter(new LLCoreHttpUtil::HttpCoroutineAdapter( mPoolName + "Adapter", mHTTPPolicy));

        std::string pooledCoro = LLCoros::instance().launch(
            "LLCoprocedurePool("+mPoolName+")::coprocedureInvokerCoro",
            boost::bind(&LLCoprocedurePool::coprocedureInvokerCoro, this,
                        mPendingCoprocs, httpAdapter));

        mCoroMapping.insert(CoroAdapterMap_t::value_type(pooledCoro, httpAdapter));
    }

    LL_INFOS("CoProcMgr") << "Created coprocedure pool named \"" << mPoolName << "\" with " << size << " items, queue max " << LLCoprocedureManager::DEFAULT_QUEUE_SIZE << LL_ENDL;
}

LLCoprocedurePool::~LLCoprocedurePool() 
{
}

//-------------------------------------------------------------------------
LLUUID LLCoprocedurePool::enqueueCoprocedure(const std::string &name, LLCoprocedurePool::CoProcedure_t proc)
{
    LLUUID id(LLUUID::generateNewID());

    LL_INFOS("CoProcMgr") << "Coprocedure(" << name << ") enqueuing with id=" << id.asString() << " in pool \"" << mPoolName << "\" at " << mPending << LL_ENDL;
    auto pushed = mPendingCoprocs->try_push(boost::make_shared<QueuedCoproc>(name, id, proc));
    if (pushed == boost::fibers::channel_op_status::success)
    {
        ++mPending;
        return id;
    }

    // Here we didn't succeed in pushing. Shutdown could be the reason.
    if (pushed == boost::fibers::channel_op_status::closed)
    {
        LL_WARNS("CoProcMgr") << "Discarding coprocedure '" << name << "' because shutdown" << LL_ENDL;
        return {};
    }

    // The queue should never fill up.
    LL_ERRS("CoProcMgr") << "Enqueue into '" << name << "' failed (" << unsigned(pushed) << ")" << LL_ENDL;
    return {};                      // never executed, pacify the compiler
}

//-------------------------------------------------------------------------
void LLCoprocedurePool::coprocedureInvokerCoro(
    CoprocQueuePtr pendingCoprocs,
    LLCoreHttpUtil::HttpCoroutineAdapter::ptr_t httpAdapter)
{
    for (;;)
    {
        // It is VERY IMPORTANT that we instantiate a new ptr_t just before
        // the pop_wait_for() call below. When this ptr_t was declared at
        // function scope (outside the for loop), NickyD correctly diagnosed a
        // mysterious hang condition due to:
        // - the second time through the loop, the ptr_t held the last pointer
        //   to the previous QueuedCoproc, which indirectly held the last
        //   LLPointer to an LLInventoryCallback instance
        // - while holding the lock on pendingCoprocs, pop_wait_for() assigned
        //   the popped value to the ptr_t variable
        // - assignment destroyed the previous value of that variable, which
        //   indirectly destroyed the LLInventoryCallback
        // - whose destructor called ~LLRequestServerAppearanceUpdateOnDestroy()
        // - which called LLAppearanceMgr::requestServerAppearanceUpdate()
        // - which called enqueueCoprocedure()
        // - which tried to acquire the lock on pendingCoprocs... alas.
        // Using a fresh, clean ptr_t ensures that no previous value is
        // destroyed during pop_wait_for().
        QueuedCoproc::ptr_t coproc;
        boost::fibers::channel_op_status status;
        {
            LLCoros::TempStatus st("waiting for work for 10s");
            status = pendingCoprocs->pop_wait_for(coproc, std::chrono::seconds(10));
        }
        if (status == boost::fibers::channel_op_status::closed)
        {
            break;
        }

        if(status == boost::fibers::channel_op_status::timeout)
        {
            LL_DEBUGS_ONCE("CoProcMgr") << "pool '" << mPoolName << "' waiting." << LL_ENDL;
            continue;
        }
        // we actually popped an item
        --mPending;
        mActiveCoprocsCount++;

        LL_DEBUGS("CoProcMgr") << "Dequeued and invoking coprocedure(" << coproc->mName << ") with id=" << coproc->mId.asString() << " in pool \"" << mPoolName << "\" (" << mPending << " left)" << LL_ENDL;

        try
        {
            coproc->mProc(httpAdapter, coproc->mId);
        }
        catch (const LLCoros::Stop &e)
        {
            LL_INFOS("LLCoros") << "coprocedureInvokerCoro terminating because "
                << e.what() << LL_ENDL;
            throw; // let toplevel handle this as LLContinueError
        }
        catch (...)
        {
            LOG_UNHANDLED_EXCEPTION(STRINGIZE("Coprocedure('" << coproc->mName
                                              << "', id=" << coproc->mId.asString()
                                              << ") in pool '" << mPoolName << "'"));
            // must NOT omit this or we deplete the pool
            mActiveCoprocsCount--;
            continue;
        }

        LL_DEBUGS("CoProcMgr") << "Finished coprocedure(" << coproc->mName << ")" << " in pool \"" << mPoolName << "\"" << LL_ENDL;

        mActiveCoprocsCount--;
    }
}

void LLCoprocedurePool::close()
{
    mPendingCoprocs->close();
}