/**
 * @file llthreadsafequeue.h
 * @brief Queue protected with mutexes for cross-thread use
 *
 * $LicenseInfo:firstyear=2004&license=viewerlgpl$
 * Second Life Viewer Source Code
 * Copyright (C) 2010, Linden Research, Inc.
 *
 * This library is free software; you can redistribute it and/or
 * modify it under the terms of the GNU Lesser General Public
 * License as published by the Free Software Foundation;
 * version 2.1 of the License only.
 *
 * This library is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
 * Lesser General Public License for more details.
 *
 * You should have received a copy of the GNU Lesser General Public
 * License along with this library; if not, write to the Free Software
 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301  USA
 *
 * Linden Research, Inc., 945 Battery Street, San Francisco, CA  94111  USA
 * $/LicenseInfo$
 */

#ifndef LL_LLTHREADSAFEQUEUE_H
#define LL_LLTHREADSAFEQUEUE_H

#include "llcoros.h"
#include LLCOROS_MUTEX_HEADER
#include <boost/fiber/timed_mutex.hpp>
#include LLCOROS_CONDVAR_HEADER
#include "llexception.h"
#include "mutex.h"
#include <chrono>
#include <queue>
#include <string>

/*****************************************************************************
*   LLThreadSafeQueue
*****************************************************************************/
//
// A general queue exception.
//
class LL_COMMON_API LLThreadSafeQueueError:
    public LLException
{
public:
    LLThreadSafeQueueError(std::string const & message):
        LLException(message)
    {
        ; // No op.
    }
};


//
// An exception raised when blocking operations are interrupted.
//
class LL_COMMON_API LLThreadSafeQueueInterrupt:
    public LLThreadSafeQueueError
{
public:
    LLThreadSafeQueueInterrupt(void):
        LLThreadSafeQueueError("queue operation interrupted")
    {
        ; // No op.
    }
};

/**
 * Implements a thread safe FIFO.
 */
// Let the default std::queue default to underlying std::deque. Override if
// desired.
template<typename ElementT, typename QueueT=std::queue<ElementT>>
class LLThreadSafeQueue
{
public:
    typedef ElementT value_type;

    // Limiting the number of pending items prevents unbounded growth of the
    // underlying queue.
    LLThreadSafeQueue(size_t capacity = 1024);
    virtual ~LLThreadSafeQueue() {}

    // Add an element to the queue (will block if the queue has reached
    // capacity).
    //
    // This call will raise an interrupt error if the queue is closed while
    // the caller is blocked.
    template <typename T>
    void push(T&& element);
    // legacy name
    void pushFront(ElementT const & element) { return push(element); }

    // Add an element to the queue (will block if the queue has reached
    // capacity). Return false if the queue is closed before push is possible.
    template <typename T>
    bool pushIfOpen(T&& element);

    // Try to add an element to the queue without blocking. Returns
    // true only if the element was actually added.
    template <typename T>
    bool tryPush(T&& element);
    // legacy name
    bool tryPushFront(ElementT const & element) { return tryPush(element); }

    // Try to add an element to the queue, blocking if full but with timeout
    // after specified duration. Returns true if the element was added.
    // There are potentially two different timeouts involved: how long to try
    // to lock the mutex, versus how long to wait for the queue to stop being
    // full. Careful settings for each timeout might be orders of magnitude
    // apart. However, this method conflates them.
    template <typename Rep, typename Period, typename T>
    bool tryPushFor(const std::chrono::duration<Rep, Period>& timeout,
                    T&& element);
    // legacy name
    template <typename Rep, typename Period>
    bool tryPushFrontFor(const std::chrono::duration<Rep, Period>& timeout,
                         ElementT const & element) { return tryPushFor(timeout, element); }

    // Try to add an element to the queue, blocking if full but with
    // timeout at specified time_point. Returns true if the element was added.
    template <typename Clock, typename Duration, typename T>
    bool tryPushUntil(const std::chrono::time_point<Clock, Duration>& until,
                      T&& element);
    // no legacy name because this is a newer method

    // Pop the element at the head of the queue (will block if the queue is
    // empty).
    //
    // This call will raise an interrupt error if the queue is closed while
    // the caller is blocked.
    ElementT pop(void);
    // legacy name
    ElementT popBack(void) { return pop(); }

    // Pop an element from the head of the queue if there is one available.
    // Returns true only if an element was popped.
    bool tryPop(ElementT & element);
    // legacy name
    bool tryPopBack(ElementT & element) { return tryPop(element); }

    // Pop the element at the head of the queue, blocking if empty, with
    // timeout after specified duration. Returns true if an element was popped.
    template <typename Rep, typename Period>
    bool tryPopFor(const std::chrono::duration<Rep, Period>& timeout, ElementT& element);
    // no legacy name because this is a newer method

    // Pop the element at the head of the queue, blocking if empty, with
    // timeout at specified time_point. Returns true if an element was popped.
    template <typename Clock, typename Duration>
    bool tryPopUntil(const std::chrono::time_point<Clock, Duration>& until,
                     ElementT& element);
    // no legacy name because this is a newer method

    // Returns the size of the queue.
    size_t size();

    //Returns the capacity of the queue.
    U32 capacity() { return mCapacity; }

    // closes the queue:
    // - every subsequent push() call will throw LLThreadSafeQueueInterrupt
    // - every subsequent tryPush() call will return false
    // - pop() calls will return normally until the queue is drained, then
    //   every subsequent pop() will throw LLThreadSafeQueueInterrupt
    // - tryPop() calls will return normally until the queue is drained,
    //   then every subsequent tryPop() call will return false
    void close();

    // producer end: are we prevented from pushing any additional items?
    bool isClosed();
    // consumer end: are we done, is the queue entirely drained?
    bool done();

protected:
    typedef QueueT queue_type;
    QueueT mStorage;
    size_t mCapacity;
    bool mClosed;

    boost::fibers::timed_mutex mLock;
    typedef std::unique_lock<decltype(mLock)> lock_t;
    boost::fibers::condition_variable_any mCapacityCond;
    boost::fibers::condition_variable_any mEmptyCond;

    enum pop_result { EMPTY, DONE, WAITING, POPPED };
    // implementation logic, suitable for passing to tryLockUntil()
    template <typename Clock, typename Duration>
    pop_result tryPopUntil_(lock_t& lock,
                            const std::chrono::time_point<Clock, Duration>& until,
                            ElementT& element);
    // if we're able to lock immediately, do so and run the passed callable,
    // which must accept lock_t& and return bool
    template <typename CALLABLE>
    bool tryLock(CALLABLE&& callable);
    // if we're able to lock before the passed time_point, do so and run the
    // passed callable, which must accept lock_t& and return bool
    template <typename Clock, typename Duration, typename CALLABLE>
    bool tryLockUntil(const std::chrono::time_point<Clock, Duration>& until,
                      CALLABLE&& callable);
    // while lock is locked, really push the passed element, if we can
    template <typename T>
    bool push_(lock_t& lock, T&& element);
    // while lock is locked, really pop the head element, if we can
    pop_result pop_(lock_t& lock, ElementT& element);
    // Is the current head element ready to pop? We say yes; subclass can
    // override as needed.
    virtual bool canPop(const ElementT& head) const { return true; }
};

/*****************************************************************************
*   PriorityQueueAdapter
*****************************************************************************/
namespace LL
{
    /**
     * std::priority_queue's API is almost like std::queue, intentionally of
     * course, but you must access the element about to pop() as top() rather
     * than as front(). Make an adapter for use with LLThreadSafeQueue.
     */
    template <typename T, typename Container=std::vector<T>,
              typename Compare=std::less<typename Container::value_type>>
    class PriorityQueueAdapter
    {
    public:
        // publish all the same types
        typedef std::priority_queue<T, Container, Compare> queue_type;
        typedef typename queue_type::container_type  container_type;
        typedef typename queue_type::value_compare   value_compare;
        typedef typename queue_type::value_type      value_type;
        typedef typename queue_type::size_type       size_type;
        typedef typename queue_type::reference       reference;
        typedef typename queue_type::const_reference const_reference;

        // Although std::queue defines both const and non-const front()
        // methods, std::priority_queue defines only const top().
        const_reference front() const { return mQ.top(); }
        // std::priority_queue has no equivalent to back(), so it's good that
        // LLThreadSafeQueue doesn't use it.

        // All the rest of these merely forward to the corresponding
        // queue_type methods.
        bool empty() const                 { return mQ.empty(); }
        size_type size() const             { return mQ.size(); }
        void push(const value_type& value) { mQ.push(value); }
        void push(value_type&& value)      { mQ.push(std::move(value)); }
        template <typename... Args>
        void emplace(Args&&... args)       { mQ.emplace(std::forward<Args>(args)...); }
        void pop()                         { mQ.pop(); }

    private:
        queue_type mQ;
    };
} // namespace LL


/*****************************************************************************
*   LLThreadSafeQueue implementation
*****************************************************************************/
template<typename ElementT, typename QueueT>
LLThreadSafeQueue<ElementT, QueueT>::LLThreadSafeQueue(size_t capacity) :
    mCapacity(capacity),
    mClosed(false)
{
}


// if we're able to lock immediately, do so and run the passed callable, which
// must accept lock_t& and return bool
template <typename ElementT, typename QueueT>
template <typename CALLABLE>
bool LLThreadSafeQueue<ElementT, QueueT>::tryLock(CALLABLE&& callable)
{
    LL_PROFILE_ZONE_SCOPED_CATEGORY_THREAD;
    lock_t lock1(mLock, std::defer_lock);
    if (!lock1.try_lock())
        return false;

    return std::forward<CALLABLE>(callable)(lock1);
}


// if we're able to lock before the passed time_point, do so and run the
// passed callable, which must accept lock_t& and return bool
template <typename ElementT, typename QueueT>
template <typename Clock, typename Duration, typename CALLABLE>
bool LLThreadSafeQueue<ElementT, QueueT>::tryLockUntil(
    const std::chrono::time_point<Clock, Duration>& until,
    CALLABLE&& callable)
{
    LL_PROFILE_ZONE_SCOPED_CATEGORY_THREAD;
    lock_t lock1(mLock, std::defer_lock);
    if (!lock1.try_lock_until(until))
        return false;

    return std::forward<CALLABLE>(callable)(lock1);
}


// while lock is locked, really push the passed element, if we can
template <typename ElementT, typename QueueT>
template <typename T>
bool LLThreadSafeQueue<ElementT, QueueT>::push_(lock_t& lock, T&& element)
{
    LL_PROFILE_ZONE_SCOPED_CATEGORY_THREAD;
    if (mStorage.size() >= mCapacity)
        return false;

    mStorage.push(std::forward<T>(element));
    lock.unlock();
    // now that we've pushed, if somebody's been waiting to pop, signal them
    mEmptyCond.notify_one();
    return true;
}


template <typename ElementT, typename QueueT>
template <typename T>
bool LLThreadSafeQueue<ElementT, QueueT>::pushIfOpen(T&& element)
{
    LL_PROFILE_ZONE_SCOPED_CATEGORY_THREAD;
    lock_t lock1(mLock);
    while (true)
    {
        // On the producer side, it doesn't matter whether the queue has been
        // drained or not: the moment either end calls close(), further push()
        // operations will fail.
        if (mClosed)
            return false;

        if (push_(lock1, std::forward<T>(element)))
            return true;

        // Storage Full. Wait for signal.
        mCapacityCond.wait(lock1);
    }
}


template <typename ElementT, typename QueueT>
template<typename T>
void LLThreadSafeQueue<ElementT, QueueT>::push(T&& element)
{
    LL_PROFILE_ZONE_SCOPED_CATEGORY_THREAD;
    if (! pushIfOpen(std::forward<T>(element)))
    {
        LLTHROW(LLThreadSafeQueueInterrupt());
    }
}


template<typename ElementT, typename QueueT>
template<typename T>
bool LLThreadSafeQueue<ElementT, QueueT>::tryPush(T&& element)
{
    LL_PROFILE_ZONE_SCOPED_CATEGORY_THREAD;
    return tryLock(
        [this, element=std::move(element)](lock_t& lock)
        {
            if (mClosed)
                return false;
            return push_(lock, std::move(element));
        });
}


template <typename ElementT, typename QueueT>
template <typename Rep, typename Period, typename T>
bool LLThreadSafeQueue<ElementT, QueueT>::tryPushFor(
    const std::chrono::duration<Rep, Period>& timeout,
    T&& element)
{
    LL_PROFILE_ZONE_SCOPED_CATEGORY_THREAD;
    // Convert duration to time_point: passing the same timeout duration to
    // each of multiple calls is wrong.
    return tryPushUntil(std::chrono::steady_clock::now() + timeout,
                        std::forward<T>(element));
}


template <typename ElementT, typename QueueT>
template <typename Clock, typename Duration, typename T>
bool LLThreadSafeQueue<ElementT, QueueT>::tryPushUntil(
    const std::chrono::time_point<Clock, Duration>& until,
    T&& element)
{
    LL_PROFILE_ZONE_SCOPED_CATEGORY_THREAD;
    return tryLockUntil(
        until,
        [this, until, element=std::move(element)](lock_t& lock)
        {
            while (true)
            {
                if (mClosed)
                {
                    return false;
                }

                if (push_(lock, std::move(element)))
                    return true;

                // Storage Full. Wait for signal.
                if (LLCoros::cv_status::timeout == mCapacityCond.wait_until(lock, until))
                {
                    // timed out -- formally we might recheck both conditions above
                    return false;
                }
                // If we didn't time out, we were notified for some reason. Loop back
                // to check.
            }
        });
}


// while lock is locked, really pop the head element, if we can
template <typename ElementT, typename QueueT>
typename LLThreadSafeQueue<ElementT, QueueT>::pop_result
LLThreadSafeQueue<ElementT, QueueT>::pop_(lock_t& lock, ElementT& element)
{
    LL_PROFILE_ZONE_SCOPED_CATEGORY_THREAD;
    // If mStorage is empty, there's no head element.
    if (mStorage.empty())
        return mClosed? DONE : EMPTY;

    // If there's a head element, pass it to canPop() to see if it's ready to pop.
    if (! canPop(mStorage.front()))
        return WAITING;

    // std::queue::front() is the element about to pop()
    element = mStorage.front();
    mStorage.pop();
    lock.unlock();
    // now that we've popped, if somebody's been waiting to push, signal them
    mCapacityCond.notify_one();
    return POPPED;
}


template<typename ElementT, typename QueueT>
ElementT LLThreadSafeQueue<ElementT, QueueT>::pop(void)
{
    LL_PROFILE_ZONE_SCOPED_CATEGORY_THREAD;
    lock_t lock1(mLock);
    ElementT value;
    while (true)
    {
        // On the consumer side, we always try to pop before checking mClosed
        // so we can finish draining the queue.
        pop_result popped = pop_(lock1, value);
        if (popped == POPPED)
            return std::move(value);

        // Once the queue is DONE, there will never be any more coming.
        if (popped == DONE)
        {
            LLTHROW(LLThreadSafeQueueInterrupt());
        }

        // If we didn't pop because WAITING, i.e. canPop() returned false,
        // then even if the producer end has been closed, there's still at
        // least one item to drain: wait for it. Or we might be EMPTY, with
        // the queue still open. Either way, wait for signal.
        mEmptyCond.wait(lock1);
    }
}


template<typename ElementT, typename QueueT>
bool LLThreadSafeQueue<ElementT, QueueT>::tryPop(ElementT & element)
{
    LL_PROFILE_ZONE_SCOPED_CATEGORY_THREAD;
    return tryLock(
        [this, &element](lock_t& lock)
        {
            // conflate EMPTY, DONE, WAITING: tryPop() behavior when the queue
            // is closed is implemented by simple inability to push any new
            // elements
            return pop_(lock, element) == POPPED;
        });
}


template <typename ElementT, typename QueueT>
template <typename Rep, typename Period>
bool LLThreadSafeQueue<ElementT, QueueT>::tryPopFor(
    const std::chrono::duration<Rep, Period>& timeout,
    ElementT& element)
{
    LL_PROFILE_ZONE_SCOPED_CATEGORY_THREAD;
    // Convert duration to time_point: passing the same timeout duration to
    // each of multiple calls is wrong.
    return tryPopUntil(std::chrono::steady_clock::now() + timeout, element);
}


template <typename ElementT, typename QueueT>
template <typename Clock, typename Duration>
bool LLThreadSafeQueue<ElementT, QueueT>::tryPopUntil(
    const std::chrono::time_point<Clock, Duration>& until,
    ElementT& element)
{
    LL_PROFILE_ZONE_SCOPED_CATEGORY_THREAD;
    return tryLockUntil(
        until,
        [this, until, &element](lock_t& lock)
        {
            // conflate EMPTY, DONE, WAITING
            return tryPopUntil_(lock, until, element) == POPPED;
        });
}


// body of tryPopUntil(), called once we have the lock
template <typename ElementT, typename QueueT>
template <typename Clock, typename Duration>
typename LLThreadSafeQueue<ElementT, QueueT>::pop_result
LLThreadSafeQueue<ElementT, QueueT>::tryPopUntil_(
    lock_t& lock,
    const std::chrono::time_point<Clock, Duration>& until,
    ElementT& element)
{
    LL_PROFILE_ZONE_SCOPED_CATEGORY_THREAD;
    while (true)
    {
        pop_result popped = pop_(lock, element);
        if (popped == POPPED || popped == DONE)
        {
            // If we succeeded, great! If we've drained the last item, so be
            // it. Either way, break the loop and tell caller.
            return popped;
        }

        // EMPTY or WAITING: wait for signal.
        if (LLCoros::cv_status::timeout == mEmptyCond.wait_until(lock, until))
        {
            // timed out -- formally we might recheck
            // as it is, break loop
            return popped;
        }
        // If we didn't time out, we were notified for some reason. Loop back
        // to check.
    }
}


template<typename ElementT, typename QueueT>
size_t LLThreadSafeQueue<ElementT, QueueT>::size(void)
{
    LL_PROFILE_ZONE_SCOPED_CATEGORY_THREAD;
    lock_t lock(mLock);
    return mStorage.size();
}


template<typename ElementT, typename QueueT>
void LLThreadSafeQueue<ElementT, QueueT>::close()
{
    LL_PROFILE_ZONE_SCOPED_CATEGORY_THREAD;
    lock_t lock(mLock);
    mClosed = true;
    lock.unlock();
    // wake up any blocked pop() calls
    mEmptyCond.notify_all();
    // wake up any blocked push() calls
    mCapacityCond.notify_all();
}


template<typename ElementT, typename QueueT>
bool LLThreadSafeQueue<ElementT, QueueT>::isClosed()
{
    LL_PROFILE_ZONE_SCOPED_CATEGORY_THREAD;
    lock_t lock(mLock);
    return mClosed;
}


template<typename ElementT, typename QueueT>
bool LLThreadSafeQueue<ElementT, QueueT>::done()
{
    LL_PROFILE_ZONE_SCOPED_CATEGORY_THREAD;
    lock_t lock(mLock);
    return mClosed && mStorage.empty();
}

#endif