diff options
Diffstat (limited to 'indra/llmessage')
| -rw-r--r-- | indra/llmessage/llpacketbuffer.cpp | 26 | ||||
| -rw-r--r-- | indra/llmessage/llpacketbuffer.h | 12 | ||||
| -rw-r--r-- | indra/llmessage/llpacketring.cpp | 444 | ||||
| -rw-r--r-- | indra/llmessage/llpacketring.h | 79 | ||||
| -rw-r--r-- | indra/llmessage/message.cpp | 13 | ||||
| -rw-r--r-- | indra/llmessage/message.h | 3 | ||||
| -rw-r--r-- | indra/llmessage/net.cpp | 10 | 
7 files changed, 286 insertions, 301 deletions
| diff --git a/indra/llmessage/llpacketbuffer.cpp b/indra/llmessage/llpacketbuffer.cpp index dc5c7a73cb..0b04a560be 100644 --- a/indra/llmessage/llpacketbuffer.cpp +++ b/indra/llmessage/llpacketbuffer.cpp @@ -32,8 +32,6 @@  #include "lltimer.h"  #include "llhost.h" -/////////////////////////////////////////////////////////// -  LLPacketBuffer::LLPacketBuffer(const LLHost &host, const char *datap, const S32 size) : mHost(host)  {      mSize = 0; @@ -41,7 +39,7 @@ LLPacketBuffer::LLPacketBuffer(const LLHost &host, const char *datap, const S32      if (size > NET_BUFFER_SIZE)      { -        LL_ERRS() << "Sending packet > " << NET_BUFFER_SIZE << " of size " << size << LL_ENDL; +        LL_ERRS() << "Constructing packet with size=" << size << " > " << NET_BUFFER_SIZE << LL_ENDL;      }      else      { @@ -51,7 +49,6 @@ LLPacketBuffer::LLPacketBuffer(const LLHost &host, const char *datap, const S32              mSize = size;          }      } -  }  LLPacketBuffer::LLPacketBuffer (S32 hSocket) @@ -59,18 +56,29 @@ LLPacketBuffer::LLPacketBuffer (S32 hSocket)      init(hSocket);  } -/////////////////////////////////////////////////////////// -  LLPacketBuffer::~LLPacketBuffer ()  {  } -/////////////////////////////////////////////////////////// - -void LLPacketBuffer::init (S32 hSocket) +void LLPacketBuffer::init(S32 hSocket)  {      mSize = receive_packet(hSocket, mData);      mHost = ::get_sender();      mReceivingIF = ::get_receiving_interface();  } +void LLPacketBuffer::init(const char* buffer, S32 data_size, const LLHost& host) +{ +    if (data_size > NET_BUFFER_SIZE) +    { +        LL_ERRS() << "Initializing packet with size=" << data_size << " > " << NET_BUFFER_SIZE << LL_ENDL; +    } +    else +    { +        memcpy(mData, buffer, data_size); +        mSize = data_size; +        mHost = host; +        mReceivingIF = ::get_receiving_interface(); +    } +} + diff --git a/indra/llmessage/llpacketbuffer.h b/indra/llmessage/llpacketbuffer.h index a2d2973fb0..ac4012d330 100644 --- a/indra/llmessage/llpacketbuffer.h +++ b/indra/llmessage/llpacketbuffer.h @@ -35,20 +35,22 @@ class LLPacketBuffer  {  public:      LLPacketBuffer(const LLHost &host, const char *datap, const S32 size); -    LLPacketBuffer(S32 hSocket);           // receive a packet +    LLPacketBuffer(S32 hSocket);    // receive a packet      ~LLPacketBuffer();      S32         getSize() const                 { return mSize; }      const char  *getData() const                { return mData; }      LLHost      getHost() const                 { return mHost; }      LLHost      getReceivingInterface() const   { return mReceivingIF; } +      void init(S32 hSocket); +    void init(const char* buffer, S32 data_size, const LLHost& host);  protected: -    char    mData[NET_BUFFER_SIZE];        // packet data       /* Flawfinder : ignore */ -    S32     mSize;          // size of buffer in bytes -    LLHost  mHost;         // source/dest IP and port -    LLHost  mReceivingIF;         // source/dest IP and port +    char    mData[NET_BUFFER_SIZE]; // packet data       /* Flawfinder : ignore */ +    S32     mSize;                  // size of buffer in bytes +    LLHost  mHost;                  // source/dest IP and port +    LLHost  mReceivingIF;           // source/dest IP and port  };  #endif diff --git a/indra/llmessage/llpacketring.cpp b/indra/llmessage/llpacketring.cpp index be838770a8..ae5a2168db 100644 --- a/indra/llmessage/llpacketring.cpp +++ b/indra/llmessage/llpacketring.cpp @@ -1,6 +1,6 @@  /**   * @file llpacketring.cpp - * @brief implementation of LLPacketRing class for a packet. + * @brief implementation of LLPacketRing class.   *   * $LicenseInfo:firstyear=2001&license=viewerlgpl$   * Second Life Viewer Source Code @@ -43,329 +43,301 @@  #include "message.h"  #include "u64.h" -/////////////////////////////////////////////////////////// -LLPacketRing::LLPacketRing () : -    mUseInThrottle(false), -    mUseOutThrottle(false), -    mInThrottle(256000.f), -    mOutThrottle(64000.f), -    mActualBitsIn(0), -    mActualBitsOut(0), -    mMaxBufferLength(64000), -    mInBufferLength(0), -    mOutBufferLength(0), -    mDropPercentage(0.0f), -    mPacketsToDrop(0x0) +constexpr S16 MAX_BUFFER_RING_SIZE = 1024; +constexpr S16 DEFAULT_BUFFER_RING_SIZE = 256; + +LLPacketRing::LLPacketRing () +    : mPacketRing(DEFAULT_BUFFER_RING_SIZE, nullptr)  { +    LLHost invalid_host; +    for (size_t i = 0; i < mPacketRing.size(); ++i) +    { +        mPacketRing[i] = new LLPacketBuffer(invalid_host, nullptr, 0); +    }  } -///////////////////////////////////////////////////////////  LLPacketRing::~LLPacketRing ()  { -    cleanup(); +    for (auto packet : mPacketRing) +    { +        delete packet; +    } +    mPacketRing.clear(); +    mNumBufferedPackets = 0; +    mNumBufferedBytes = 0; +    mHeadIndex = 0;  } -/////////////////////////////////////////////////////////// -void LLPacketRing::cleanup () +S32 LLPacketRing::receivePacket (S32 socket, char *datap)  { -    LLPacketBuffer *packetp; +    bool drop = computeDrop(); +    return (mNumBufferedPackets > 0) ? +        receiveOrDropBufferedPacket(datap, drop) : +        receiveOrDropPacket(socket, datap, drop); +} -    while (!mReceiveQueue.empty()) +bool send_packet_helper(int socket, const char * datap, S32 data_size, LLHost host) +{ +    if (!LLProxy::isSOCKSProxyEnabled())      { -        packetp = mReceiveQueue.front(); -        delete packetp; -        mReceiveQueue.pop(); +        return send_packet(socket, datap, data_size, host.getAddress(), host.getPort());      } -    while (!mSendQueue.empty()) -    { -        packetp = mSendQueue.front(); -        delete packetp; -        mSendQueue.pop(); -    } -} +    char headered_send_buffer[NET_BUFFER_SIZE + SOCKS_HEADER_SIZE]; -/////////////////////////////////////////////////////////// -void LLPacketRing::dropPackets (U32 num_to_drop) -{ -    mPacketsToDrop += num_to_drop; -} +    proxywrap_t *socks_header = static_cast<proxywrap_t*>(static_cast<void*>(&headered_send_buffer)); +    socks_header->rsv   = 0; +    socks_header->addr  = host.getAddress(); +    socks_header->port  = htons(host.getPort()); +    socks_header->atype = ADDRESS_IPV4; +    socks_header->frag  = 0; -/////////////////////////////////////////////////////////// -void LLPacketRing::setDropPercentage (F32 percent_to_drop) -{ -    mDropPercentage = percent_to_drop; -} +    memcpy(headered_send_buffer + SOCKS_HEADER_SIZE, datap, data_size); -void LLPacketRing::setUseInThrottle(const bool use_throttle) -{ -    mUseInThrottle = use_throttle; +    return send_packet( socket, +                        headered_send_buffer, +                        data_size + SOCKS_HEADER_SIZE, +                        LLProxy::getInstance()->getUDPProxy().getAddress(), +                        LLProxy::getInstance()->getUDPProxy().getPort());  } -void LLPacketRing::setUseOutThrottle(const bool use_throttle) +bool LLPacketRing::sendPacket(int socket, const char * datap, S32 data_size, LLHost host)  { -    mUseOutThrottle = use_throttle; +    mActualBytesOut += data_size; +    return send_packet_helper(socket, datap, data_size, host);  } -void LLPacketRing::setInBandwidth(const F32 bps) +void LLPacketRing::dropPackets (U32 num_to_drop)  { -    mInThrottle.setRate(bps); +    mPacketsToDrop += num_to_drop;  } -void LLPacketRing::setOutBandwidth(const F32 bps) +void LLPacketRing::setDropPercentage (F32 percent_to_drop)  { -    mOutThrottle.setRate(bps); +    mDropPercentage = percent_to_drop;  } -/////////////////////////////////////////////////////////// -S32 LLPacketRing::receiveFromRing (S32 socket, char *datap) -{ -    if (mInThrottle.checkOverflow(0)) -    { -        // We don't have enough bandwidth, don't give them a packet. -        return 0; -    } - -    LLPacketBuffer *packetp = NULL; -    if (mReceiveQueue.empty()) +bool LLPacketRing::computeDrop() +{ +    bool drop= (mDropPercentage > 0.0f && (ll_frand(100.f) < mDropPercentage)); +    if (drop)      { -        // No packets on the queue, don't give them any. -        return 0; +        ++mPacketsToDrop;      } - -    S32 packet_size = 0; -    packetp = mReceiveQueue.front(); -    mReceiveQueue.pop(); -    packet_size = packetp->getSize(); -    if (packetp->getData() != NULL) +    if (mPacketsToDrop > 0)      { -        memcpy(datap, packetp->getData(), packet_size); /*Flawfinder: ignore*/ +        --mPacketsToDrop; +        drop = true;      } -    // need to set sender IP/port!! -    mLastSender = packetp->getHost(); -    mLastReceivingIF = packetp->getReceivingInterface(); -    delete packetp; - -    this->mInBufferLength -= packet_size; - -    // Adjust the throttle -    mInThrottle.throttleOverflow(packet_size * 8.f); -    return packet_size; +    return drop;  } -/////////////////////////////////////////////////////////// -S32 LLPacketRing::receivePacket (S32 socket, char *datap) +S32 LLPacketRing::receiveOrDropPacket(S32 socket, char *datap, bool drop)  {      S32 packet_size = 0; -    // If using the throttle, simulate a limited size input buffer. -    if (mUseInThrottle) +    // pull straight from socket +    if (LLProxy::isSOCKSProxyEnabled())      { -        bool done = false; - -        // push any current net packet (if any) onto delay ring -        while (!done) +        char buffer[NET_BUFFER_SIZE + SOCKS_HEADER_SIZE];   /* Flawfinder ignore */ +        packet_size = receive_packet(socket, buffer); +        if (packet_size > 0)          { -            LLPacketBuffer *packetp; -            packetp = new LLPacketBuffer(socket); - -            if (packetp->getSize()) -            { -                mActualBitsIn += packetp->getSize() * 8; - -                // Fake packet loss -                if (mDropPercentage && (ll_frand(100.f) < mDropPercentage)) -                { -                    mPacketsToDrop++; -                } - -                if (mPacketsToDrop) -                { -                    delete packetp; -                    packetp = NULL; -                    packet_size = 0; -                    mPacketsToDrop--; -                } -            } +            mActualBytesIn += packet_size; +        } -            // If we faked packet loss, then we don't have a packet -            // to use for buffer overflow testing -            if (packetp) +        if (packet_size > SOCKS_HEADER_SIZE) +        { +            if (drop)              { -                if (mInBufferLength + packetp->getSize() > mMaxBufferLength) -                { -                    // Toss it. -                    LL_WARNS() << "Throwing away packet, overflowing buffer" << LL_ENDL; -                    delete packetp; -                    packetp = NULL; -                } -                else if (packetp->getSize()) -                { -                    mReceiveQueue.push(packetp); -                    mInBufferLength += packetp->getSize(); -                } -                else -                { -                    delete packetp; -                    packetp = NULL; -                    done = true; -                } +                packet_size = 0;              }              else              { -                // No packetp, keep going? - no packetp == faked packet loss -            } -        } - -        // Now, grab data off of the receive queue according to our -        // throttled bandwidth settings. -        packet_size = receiveFromRing(socket, datap); -    } -    else -    { -        // no delay, pull straight from net -        if (LLProxy::isSOCKSProxyEnabled()) -        { -            U8 buffer[NET_BUFFER_SIZE + SOCKS_HEADER_SIZE]; -            packet_size = receive_packet(socket, static_cast<char*>(static_cast<void*>(buffer))); - -            if (packet_size > SOCKS_HEADER_SIZE) -            {                  // *FIX We are assuming ATYP is 0x01 (IPv4), not 0x03 (hostname) or 0x04 (IPv6) -                memcpy(datap, buffer + SOCKS_HEADER_SIZE, packet_size - SOCKS_HEADER_SIZE); +                packet_size -= SOCKS_HEADER_SIZE; // The unwrapped packet size +                memcpy(datap, buffer + SOCKS_HEADER_SIZE, packet_size);                  proxywrap_t * header = static_cast<proxywrap_t*>(static_cast<void*>(buffer));                  mLastSender.setAddress(header->addr);                  mLastSender.setPort(ntohs(header->port)); - -                packet_size -= SOCKS_HEADER_SIZE; // The unwrapped packet size -            } -            else -            { -                packet_size = 0; +                mLastReceivingIF = ::get_receiving_interface();              }          }          else          { -            packet_size = receive_packet(socket, datap); -            mLastSender = ::get_sender(); +            packet_size = 0;          } - -        mLastReceivingIF = ::get_receiving_interface(); - -        if (packet_size)  // did we actually get a packet? +    } +    else +    { +        packet_size = receive_packet(socket, datap); +        if (packet_size > 0)          { -            if (mDropPercentage && (ll_frand(100.f) < mDropPercentage)) +            mActualBytesIn += packet_size; +            if (drop)              { -                mPacketsToDrop++; +                packet_size = 0;              } - -            if (mPacketsToDrop) +            else              { -                packet_size = 0; -                mPacketsToDrop--; +                mLastSender = ::get_sender(); +                mLastReceivingIF = ::get_receiving_interface();              }          }      } -      return packet_size;  } -bool LLPacketRing::sendPacket(int h_socket, char * send_buffer, S32 buf_size, LLHost host) +S32 LLPacketRing::receiveOrDropBufferedPacket(char *datap, bool drop)  { -    bool status = true; -    if (!mUseOutThrottle) +    assert(mNumBufferedPackets > 0); +    S32 packet_size = 0; + +    S16 ring_size = (S16)(mPacketRing.size()); +    S16 packet_index = (mHeadIndex + ring_size - mNumBufferedPackets) % ring_size; +    LLPacketBuffer* packet = mPacketRing[packet_index]; +    packet_size = packet->getSize(); +    mLastSender = packet->getHost(); +    mLastReceivingIF = packet->getReceivingInterface(); + +    --mNumBufferedPackets; +    mNumBufferedBytes -= packet_size; +    if (mNumBufferedPackets == 0)      { -        return sendPacketImpl(h_socket, send_buffer, buf_size, host ); +        assert(mNumBufferedBytes == 0); +    } + +    if (!drop) +    { +        assert(packet_size > 0); +        memcpy(datap, packet->getData(), packet_size);      }      else      { -        mActualBitsOut += buf_size * 8; -        LLPacketBuffer *packetp = NULL; -        // See if we've got enough throttle to send a packet. -        while (!mOutThrottle.checkOverflow(0.f)) -        { -            // While we have enough bandwidth, send a packet from the queue or the current packet +        packet_size = 0; +    } +    return packet_size; +} + +S32 LLPacketRing::bufferInboundPacket(S32 socket) +{ +    if (mNumBufferedPackets == mPacketRing.size() && mNumBufferedPackets < MAX_BUFFER_RING_SIZE) +    { +        expandRing(); +    } -            S32 packet_size = 0; -            if (!mSendQueue.empty()) +    LLPacketBuffer* packet = mPacketRing[mHeadIndex]; +    S32 old_packet_size = packet->getSize(); +    S32 packet_size = 0; +    if (LLProxy::isSOCKSProxyEnabled()) +    { +        char buffer[NET_BUFFER_SIZE + SOCKS_HEADER_SIZE];   /* Flawfinder ignore */ +        packet_size = receive_packet(socket, buffer); +        if (packet_size > 0) +        { +            mActualBytesIn += packet_size; +            if (packet_size > SOCKS_HEADER_SIZE)              { -                // Send a packet off of the queue -                LLPacketBuffer *packetp = mSendQueue.front(); -                mSendQueue.pop(); +                // *FIX We are assuming ATYP is 0x01 (IPv4), not 0x03 (hostname) or 0x04 (IPv6) -                mOutBufferLength -= packetp->getSize(); -                packet_size = packetp->getSize(); +                proxywrap_t * header = static_cast<proxywrap_t*>(static_cast<void*>(buffer)); +                LLHost sender; +                sender.setAddress(header->addr); +                sender.setPort(ntohs(header->port)); -                status = sendPacketImpl(h_socket, packetp->getData(), packet_size, packetp->getHost()); +                packet_size -= SOCKS_HEADER_SIZE; // The unwrapped packet size +                packet->init(buffer + SOCKS_HEADER_SIZE, packet_size, sender); -                delete packetp; -                // Update the throttle -                mOutThrottle.throttleOverflow(packet_size * 8.f); +                mHeadIndex = (mHeadIndex + 1) % (S16)(mPacketRing.size()); +                if (mNumBufferedPackets < MAX_BUFFER_RING_SIZE) +                { +                    ++mNumBufferedPackets; +                    mNumBufferedBytes += packet_size; +                } +                else +                { +                    // we overwrote an older packet +                    mNumBufferedBytes += packet_size - old_packet_size; +                }              }              else              { -                // If the queue's empty, we can just send this packet right away. -                status =  sendPacketImpl(h_socket, send_buffer, buf_size, host ); -                packet_size = buf_size; - -                // Update the throttle -                mOutThrottle.throttleOverflow(packet_size * 8.f); - -                // This was the packet we're sending now, there are no other packets -                // that we need to send -                return status; +                packet_size = 0;              } - -        } - -        // We haven't sent the incoming packet, add it to the queue -        if (mOutBufferLength + buf_size > mMaxBufferLength) -        { -            // Nuke this packet, we overflowed the buffer. -            // Toss it. -            LL_WARNS() << "Throwing away outbound packet, overflowing buffer" << LL_ENDL;          } -        else +    } +    else +    { +        packet->init(socket); +        packet_size = packet->getSize(); +        if (packet_size > 0)          { -            static LLTimer queue_timer; -            if ((mOutBufferLength > 4192) && queue_timer.getElapsedTimeF32() > 1.f) +            mActualBytesIn += packet_size; + +            mHeadIndex = (mHeadIndex + 1) % (S16)(mPacketRing.size()); +            if (mNumBufferedPackets < MAX_BUFFER_RING_SIZE)              { -                // Add it to the queue -                LL_INFOS() << "Outbound packet queue " << mOutBufferLength << " bytes" << LL_ENDL; -                queue_timer.reset(); +                ++mNumBufferedPackets; +                mNumBufferedBytes += packet_size; +            } +            else +            { +                // we overwrote an older packet +                mNumBufferedBytes += packet_size - old_packet_size;              } -            packetp = new LLPacketBuffer(host, send_buffer, buf_size); - -            mOutBufferLength += packetp->getSize(); -            mSendQueue.push(packetp);          }      } - -    return status; +    return packet_size;  } -bool LLPacketRing::sendPacketImpl(int h_socket, const char * send_buffer, S32 buf_size, LLHost host) +S32 LLPacketRing::drainSocket(S32 socket)  { - -    if (!LLProxy::isSOCKSProxyEnabled()) +    // drain into buffer +    S32 packet_size = 1; +    S32 num_loops = 0; +    S32 old_num_packets = mNumBufferedPackets; +    while (packet_size > 0)      { -        return send_packet(h_socket, send_buffer, buf_size, host.getAddress(), host.getPort()); +        packet_size = bufferInboundPacket(socket); +        ++num_loops;      } +    S32 num_dropped_packets = (num_loops - 1 + old_num_packets) - mNumBufferedPackets; +    if (num_dropped_packets > 0) +    { +        LL_WARNS("Messaging") << "dropped " << num_dropped_packets << " UDP packets" << LL_ENDL; +    } +    return (S32)(mNumBufferedPackets); +} -    char headered_send_buffer[NET_BUFFER_SIZE + SOCKS_HEADER_SIZE]; +bool LLPacketRing::expandRing() +{ +    // compute larger size +    constexpr S16 BUFFER_RING_EXPANSION = 256; +    S16 old_size = (S16)(mPacketRing.size()); +    S16 new_size = llmin(old_size + BUFFER_RING_EXPANSION, MAX_BUFFER_RING_SIZE); +    if (new_size == old_size) +    { +        // mPacketRing is already maxed out +        return false; +    } -    proxywrap_t *socks_header = static_cast<proxywrap_t*>(static_cast<void*>(&headered_send_buffer)); -    socks_header->rsv   = 0; -    socks_header->addr  = host.getAddress(); -    socks_header->port  = htons(host.getPort()); -    socks_header->atype = ADDRESS_IPV4; -    socks_header->frag  = 0; +    // make a larger ring and copy packet pointers +    std::vector<LLPacketBuffer*> new_ring(new_size, nullptr); +    for (S16 i = 0; i < old_size; ++i) +    { +        S16 j = (mHeadIndex + i) % old_size; +        new_ring[i] = mPacketRing[j]; +    } -    memcpy(headered_send_buffer + SOCKS_HEADER_SIZE, send_buffer, buf_size); +    // allocate new packets for the remainder of new_ring +    LLHost invalid_host; +    for (S16 i = old_size; i < new_size; ++i) +    { +        new_ring[i] = new LLPacketBuffer(invalid_host, nullptr, 0); +    } -    return send_packet( h_socket, -                        headered_send_buffer, -                        buf_size + SOCKS_HEADER_SIZE, -                        LLProxy::getInstance()->getUDPProxy().getAddress(), -                        LLProxy::getInstance()->getUDPProxy().getPort()); +    // swap the rings and reset mHeadIndex +    mPacketRing.swap(new_ring); +    mHeadIndex = mNumBufferedPackets; +    return true;  } diff --git a/indra/llmessage/llpacketring.h b/indra/llmessage/llpacketring.h index f0e95f8524..0dff2c63b1 100644 --- a/indra/llmessage/llpacketring.h +++ b/indra/llmessage/llpacketring.h @@ -25,16 +25,14 @@   * $/LicenseInfo$   */ -#ifndef LL_LLPACKETRING_H -#define LL_LLPACKETRING_H +#pragma once -#include <queue> +#include <vector>  #include "llhost.h"  #include "llpacketbuffer.h" -#include "llproxy.h"  #include "llthrottle.h" -#include "net.h" +  class LLPacketRing  { @@ -42,60 +40,65 @@ public:      LLPacketRing();      ~LLPacketRing(); -    void cleanup(); +    // receive one packet: either buffered or from the socket +    S32  receivePacket (S32 socket, char *datap); + +    // send one packet +    bool sendPacket(int h_socket, const char * send_buffer, S32 buf_size, LLHost host); + +    // drains packets from socket and returns final mNumBufferedPackets +    S32 drainSocket(S32 socket);      void dropPackets(U32);      void setDropPercentage (F32 percent_to_drop); -    void setUseInThrottle(const bool use_throttle); -    void setUseOutThrottle(const bool use_throttle); -    void setInBandwidth(const F32 bps); -    void setOutBandwidth(const F32 bps); -    S32  receivePacket (S32 socket, char *datap); -    S32  receiveFromRing (S32 socket, char *datap); -    bool sendPacket(int h_socket, char * send_buffer, S32 buf_size, LLHost host); +    inline LLHost getLastSender() const; +    inline LLHost getLastReceivingInterface() const; -    inline LLHost getLastSender(); -    inline LLHost getLastReceivingInterface(); +    S32 getActualInBytes() const { return mActualBytesIn; } +    S32 getActualOutBytes() const { return mActualBytesOut; } +    S32 getAndResetActualInBits()   { S32 bits = mActualBytesIn * 8; mActualBytesIn = 0; return bits;} +    S32 getAndResetActualOutBits()  { S32 bits = mActualBytesOut * 8; mActualBytesOut = 0; return bits;} -    S32 getAndResetActualInBits()               { S32 bits = mActualBitsIn; mActualBitsIn = 0; return bits;} -    S32 getAndResetActualOutBits()              { S32 bits = mActualBitsOut; mActualBitsOut = 0; return bits;} +    S32 getNumBufferedPackets() const { return (S32)(mNumBufferedPackets); } +    S32 getNumBufferedBytes() const { return mNumBufferedBytes; }  protected: -    bool mUseInThrottle; -    bool mUseOutThrottle; +    // returns 'true' if we should intentionally drop a packet +    bool computeDrop(); -    // For simulating a lower-bandwidth connection - BPS -    LLThrottle mInThrottle; -    LLThrottle mOutThrottle; +    // returns packet_size of received packet, zero or less if no packet found +    S32 receiveOrDropPacket(S32 socket, char *datap, bool drop); +    S32 receiveOrDropBufferedPacket(char *datap, bool drop); -    S32 mActualBitsIn; -    S32 mActualBitsOut; -    S32 mMaxBufferLength;           // How much data can we queue up before dropping data. -    S32 mInBufferLength;            // Current incoming buffer length -    S32 mOutBufferLength;           // Current outgoing buffer length +    // returns packet_size of packet buffered +    S32 bufferInboundPacket(S32 socket); -    F32 mDropPercentage;            // % of packets to drop -    U32 mPacketsToDrop;             // drop next n packets +    // returns 'true' if ring was expanded +    bool expandRing(); -    std::queue<LLPacketBuffer *> mReceiveQueue; -    std::queue<LLPacketBuffer *> mSendQueue; +protected: +    std::vector<LLPacketBuffer*> mPacketRing; +    S16 mHeadIndex { 0 }; +    S16 mNumBufferedPackets { 0 }; +    S32 mNumBufferedBytes { 0 }; + +    S32 mActualBytesIn { 0 }; +    S32 mActualBytesOut { 0 }; +    F32 mDropPercentage { 0.0f };   // % of inbound packets to drop +    U32 mPacketsToDrop { 0 };       // drop next inbound n packets +    // These are the sender and receiving_interface for the last packet delivered by receivePacket()      LLHost mLastSender;      LLHost mLastReceivingIF; - -private: -    bool sendPacketImpl(int h_socket, const char * send_buffer, S32 buf_size, LLHost host);  }; -inline LLHost LLPacketRing::getLastSender() +inline LLHost LLPacketRing::getLastSender() const  {      return mLastSender;  } -inline LLHost LLPacketRing::getLastReceivingInterface() +inline LLHost LLPacketRing::getLastReceivingInterface() const  {      return mLastReceivingIF;  } - -#endif diff --git a/indra/llmessage/message.cpp b/indra/llmessage/message.cpp index cfa5178fc6..c130b7a6db 100644 --- a/indra/llmessage/message.cpp +++ b/indra/llmessage/message.cpp @@ -656,8 +656,7 @@ bool LLMessageSystem::checkMessages(LockMessageChecker&, S64 frame_count )              // UseCircuitCode is allowed in even from an invalid circuit, so that              // we can toss circuits around. -            if( -                valid_packet && +            else if (                  !cdp &&                  (mTemplateMessageReader->getMessageName() !=                   _PREHASH_UseCircuitCode)) @@ -667,8 +666,7 @@ bool LLMessageSystem::checkMessages(LockMessageChecker&, S64 frame_count )                  valid_packet = false;              } -            if( -                valid_packet && +            if ( valid_packet &&                  cdp &&                  !cdp->getTrusted() &&                  mTemplateMessageReader->isTrusted()) @@ -680,7 +678,7 @@ bool LLMessageSystem::checkMessages(LockMessageChecker&, S64 frame_count )                  valid_packet = false;              } -            if( valid_packet ) +            if ( valid_packet )              {                  logValidMsg(cdp, host, recv_reliable, recv_resent, acks>0 );                  valid_packet = mTemplateMessageReader->readMessage(buffer, host); @@ -821,6 +819,11 @@ void LLMessageSystem::processAcks(LockMessageChecker&, F32 collect_time)      }  } +S32 LLMessageSystem::drainUdpSocket() +{ +    return mPacketRing.drainSocket(mSocket); +} +  void LLMessageSystem::copyMessageReceivedToSend()  {      // NOTE: babbage: switch builder to match reader to avoid diff --git a/indra/llmessage/message.h b/indra/llmessage/message.h index b4b0d94021..1844d5e7cd 100644 --- a/indra/llmessage/message.h +++ b/indra/llmessage/message.h @@ -417,6 +417,9 @@ public:      bool    checkMessages(LockMessageChecker&, S64 frame_count = 0 );      void    processAcks(LockMessageChecker&, F32 collect_time = 0.f); +    // returns total number of buffered packets after the drain +    S32     drainUdpSocket(); +      bool    isMessageFast(const char *msg);      bool    isMessage(const char *msg)      { diff --git a/indra/llmessage/net.cpp b/indra/llmessage/net.cpp index f153c938cf..2be5a9e5b6 100644 --- a/indra/llmessage/net.cpp +++ b/indra/llmessage/net.cpp @@ -76,14 +76,8 @@ static U32 gsnReceivingIFAddr = INVALID_HOST_IP_ADDRESS; // Address to which dat  const char* LOOPBACK_ADDRESS_STRING = "127.0.0.1";  const char* BROADCAST_ADDRESS_STRING = "255.255.255.255"; -#if LL_DARWIN -    // macOS returns an error when trying to set these to 400000.  Smaller values succeed. -    const int   SEND_BUFFER_SIZE    = 200000; -    const int   RECEIVE_BUFFER_SIZE = 200000; -#else // LL_DARWIN -    const int   SEND_BUFFER_SIZE    = 400000; -    const int   RECEIVE_BUFFER_SIZE = 400000; -#endif // LL_DARWIN +const int   SEND_BUFFER_SIZE    = 200000; +const int   RECEIVE_BUFFER_SIZE = 800000;  // universal functions (cross-platform) | 
