diff options
author | Andrew Meadows <leviathan@lindenlab.com> | 2025-02-18 11:38:52 -0800 |
---|---|---|
committer | GitHub <noreply@github.com> | 2025-02-18 11:38:52 -0800 |
commit | 6d0b0a77eed584c6bcf77f6cd5fdbdbf89a987d7 (patch) | |
tree | ac6b0a436e9950086db495fcf07a87986641083b | |
parent | 74d2ed918dd185cbc47ed64f78be76ae6b94a60f (diff) |
drain UDP socket to avoid dropped packets (#3565)
drain UDP socket in idleNetwork() to avoid dropped packets
-rw-r--r-- | indra/llappearance/llavatarappearance.cpp | 1 | ||||
-rw-r--r-- | indra/llmessage/llpacketbuffer.cpp | 26 | ||||
-rw-r--r-- | indra/llmessage/llpacketbuffer.h | 12 | ||||
-rw-r--r-- | indra/llmessage/llpacketring.cpp | 444 | ||||
-rw-r--r-- | indra/llmessage/llpacketring.h | 79 | ||||
-rw-r--r-- | indra/llmessage/message.cpp | 13 | ||||
-rw-r--r-- | indra/llmessage/message.h | 3 | ||||
-rw-r--r-- | indra/llmessage/net.cpp | 10 | ||||
-rw-r--r-- | indra/newview/llappviewer.cpp | 61 | ||||
-rw-r--r-- | indra/newview/llappviewer.h | 1 | ||||
-rw-r--r-- | indra/newview/llstartup.cpp | 15 |
11 files changed, 310 insertions, 355 deletions
diff --git a/indra/llappearance/llavatarappearance.cpp b/indra/llappearance/llavatarappearance.cpp index 95d55c835f..3d66809ed6 100644 --- a/indra/llappearance/llavatarappearance.cpp +++ b/indra/llappearance/llavatarappearance.cpp @@ -799,7 +799,6 @@ void LLAvatarAppearance::buildCharacter() bool status = loadAvatar(); stop_glerror(); -// gPrintMessagesThisFrame = true; LL_DEBUGS() << "Avatar load took " << timer.getElapsedTimeF32() << " seconds." << LL_ENDL; if (!status) diff --git a/indra/llmessage/llpacketbuffer.cpp b/indra/llmessage/llpacketbuffer.cpp index dc5c7a73cb..0b04a560be 100644 --- a/indra/llmessage/llpacketbuffer.cpp +++ b/indra/llmessage/llpacketbuffer.cpp @@ -32,8 +32,6 @@ #include "lltimer.h" #include "llhost.h" -/////////////////////////////////////////////////////////// - LLPacketBuffer::LLPacketBuffer(const LLHost &host, const char *datap, const S32 size) : mHost(host) { mSize = 0; @@ -41,7 +39,7 @@ LLPacketBuffer::LLPacketBuffer(const LLHost &host, const char *datap, const S32 if (size > NET_BUFFER_SIZE) { - LL_ERRS() << "Sending packet > " << NET_BUFFER_SIZE << " of size " << size << LL_ENDL; + LL_ERRS() << "Constructing packet with size=" << size << " > " << NET_BUFFER_SIZE << LL_ENDL; } else { @@ -51,7 +49,6 @@ LLPacketBuffer::LLPacketBuffer(const LLHost &host, const char *datap, const S32 mSize = size; } } - } LLPacketBuffer::LLPacketBuffer (S32 hSocket) @@ -59,18 +56,29 @@ LLPacketBuffer::LLPacketBuffer (S32 hSocket) init(hSocket); } -/////////////////////////////////////////////////////////// - LLPacketBuffer::~LLPacketBuffer () { } -/////////////////////////////////////////////////////////// - -void LLPacketBuffer::init (S32 hSocket) +void LLPacketBuffer::init(S32 hSocket) { mSize = receive_packet(hSocket, mData); mHost = ::get_sender(); mReceivingIF = ::get_receiving_interface(); } +void LLPacketBuffer::init(const char* buffer, S32 data_size, const LLHost& host) +{ + if (data_size > NET_BUFFER_SIZE) + { + LL_ERRS() << "Initializing packet with size=" << data_size << " > " << NET_BUFFER_SIZE << LL_ENDL; + } + else + { + memcpy(mData, buffer, data_size); + mSize = data_size; + mHost = host; + mReceivingIF = ::get_receiving_interface(); + } +} + diff --git a/indra/llmessage/llpacketbuffer.h b/indra/llmessage/llpacketbuffer.h index a2d2973fb0..ac4012d330 100644 --- a/indra/llmessage/llpacketbuffer.h +++ b/indra/llmessage/llpacketbuffer.h @@ -35,20 +35,22 @@ class LLPacketBuffer { public: LLPacketBuffer(const LLHost &host, const char *datap, const S32 size); - LLPacketBuffer(S32 hSocket); // receive a packet + LLPacketBuffer(S32 hSocket); // receive a packet ~LLPacketBuffer(); S32 getSize() const { return mSize; } const char *getData() const { return mData; } LLHost getHost() const { return mHost; } LLHost getReceivingInterface() const { return mReceivingIF; } + void init(S32 hSocket); + void init(const char* buffer, S32 data_size, const LLHost& host); protected: - char mData[NET_BUFFER_SIZE]; // packet data /* Flawfinder : ignore */ - S32 mSize; // size of buffer in bytes - LLHost mHost; // source/dest IP and port - LLHost mReceivingIF; // source/dest IP and port + char mData[NET_BUFFER_SIZE]; // packet data /* Flawfinder : ignore */ + S32 mSize; // size of buffer in bytes + LLHost mHost; // source/dest IP and port + LLHost mReceivingIF; // source/dest IP and port }; #endif diff --git a/indra/llmessage/llpacketring.cpp b/indra/llmessage/llpacketring.cpp index be838770a8..ae5a2168db 100644 --- a/indra/llmessage/llpacketring.cpp +++ b/indra/llmessage/llpacketring.cpp @@ -1,6 +1,6 @@ /** * @file llpacketring.cpp - * @brief implementation of LLPacketRing class for a packet. + * @brief implementation of LLPacketRing class. * * $LicenseInfo:firstyear=2001&license=viewerlgpl$ * Second Life Viewer Source Code @@ -43,329 +43,301 @@ #include "message.h" #include "u64.h" -/////////////////////////////////////////////////////////// -LLPacketRing::LLPacketRing () : - mUseInThrottle(false), - mUseOutThrottle(false), - mInThrottle(256000.f), - mOutThrottle(64000.f), - mActualBitsIn(0), - mActualBitsOut(0), - mMaxBufferLength(64000), - mInBufferLength(0), - mOutBufferLength(0), - mDropPercentage(0.0f), - mPacketsToDrop(0x0) +constexpr S16 MAX_BUFFER_RING_SIZE = 1024; +constexpr S16 DEFAULT_BUFFER_RING_SIZE = 256; + +LLPacketRing::LLPacketRing () + : mPacketRing(DEFAULT_BUFFER_RING_SIZE, nullptr) { + LLHost invalid_host; + for (size_t i = 0; i < mPacketRing.size(); ++i) + { + mPacketRing[i] = new LLPacketBuffer(invalid_host, nullptr, 0); + } } -/////////////////////////////////////////////////////////// LLPacketRing::~LLPacketRing () { - cleanup(); + for (auto packet : mPacketRing) + { + delete packet; + } + mPacketRing.clear(); + mNumBufferedPackets = 0; + mNumBufferedBytes = 0; + mHeadIndex = 0; } -/////////////////////////////////////////////////////////// -void LLPacketRing::cleanup () +S32 LLPacketRing::receivePacket (S32 socket, char *datap) { - LLPacketBuffer *packetp; + bool drop = computeDrop(); + return (mNumBufferedPackets > 0) ? + receiveOrDropBufferedPacket(datap, drop) : + receiveOrDropPacket(socket, datap, drop); +} - while (!mReceiveQueue.empty()) +bool send_packet_helper(int socket, const char * datap, S32 data_size, LLHost host) +{ + if (!LLProxy::isSOCKSProxyEnabled()) { - packetp = mReceiveQueue.front(); - delete packetp; - mReceiveQueue.pop(); + return send_packet(socket, datap, data_size, host.getAddress(), host.getPort()); } - while (!mSendQueue.empty()) - { - packetp = mSendQueue.front(); - delete packetp; - mSendQueue.pop(); - } -} + char headered_send_buffer[NET_BUFFER_SIZE + SOCKS_HEADER_SIZE]; -/////////////////////////////////////////////////////////// -void LLPacketRing::dropPackets (U32 num_to_drop) -{ - mPacketsToDrop += num_to_drop; -} + proxywrap_t *socks_header = static_cast<proxywrap_t*>(static_cast<void*>(&headered_send_buffer)); + socks_header->rsv = 0; + socks_header->addr = host.getAddress(); + socks_header->port = htons(host.getPort()); + socks_header->atype = ADDRESS_IPV4; + socks_header->frag = 0; -/////////////////////////////////////////////////////////// -void LLPacketRing::setDropPercentage (F32 percent_to_drop) -{ - mDropPercentage = percent_to_drop; -} + memcpy(headered_send_buffer + SOCKS_HEADER_SIZE, datap, data_size); -void LLPacketRing::setUseInThrottle(const bool use_throttle) -{ - mUseInThrottle = use_throttle; + return send_packet( socket, + headered_send_buffer, + data_size + SOCKS_HEADER_SIZE, + LLProxy::getInstance()->getUDPProxy().getAddress(), + LLProxy::getInstance()->getUDPProxy().getPort()); } -void LLPacketRing::setUseOutThrottle(const bool use_throttle) +bool LLPacketRing::sendPacket(int socket, const char * datap, S32 data_size, LLHost host) { - mUseOutThrottle = use_throttle; + mActualBytesOut += data_size; + return send_packet_helper(socket, datap, data_size, host); } -void LLPacketRing::setInBandwidth(const F32 bps) +void LLPacketRing::dropPackets (U32 num_to_drop) { - mInThrottle.setRate(bps); + mPacketsToDrop += num_to_drop; } -void LLPacketRing::setOutBandwidth(const F32 bps) +void LLPacketRing::setDropPercentage (F32 percent_to_drop) { - mOutThrottle.setRate(bps); + mDropPercentage = percent_to_drop; } -/////////////////////////////////////////////////////////// -S32 LLPacketRing::receiveFromRing (S32 socket, char *datap) -{ - if (mInThrottle.checkOverflow(0)) - { - // We don't have enough bandwidth, don't give them a packet. - return 0; - } - - LLPacketBuffer *packetp = NULL; - if (mReceiveQueue.empty()) +bool LLPacketRing::computeDrop() +{ + bool drop= (mDropPercentage > 0.0f && (ll_frand(100.f) < mDropPercentage)); + if (drop) { - // No packets on the queue, don't give them any. - return 0; + ++mPacketsToDrop; } - - S32 packet_size = 0; - packetp = mReceiveQueue.front(); - mReceiveQueue.pop(); - packet_size = packetp->getSize(); - if (packetp->getData() != NULL) + if (mPacketsToDrop > 0) { - memcpy(datap, packetp->getData(), packet_size); /*Flawfinder: ignore*/ + --mPacketsToDrop; + drop = true; } - // need to set sender IP/port!! - mLastSender = packetp->getHost(); - mLastReceivingIF = packetp->getReceivingInterface(); - delete packetp; - - this->mInBufferLength -= packet_size; - - // Adjust the throttle - mInThrottle.throttleOverflow(packet_size * 8.f); - return packet_size; + return drop; } -/////////////////////////////////////////////////////////// -S32 LLPacketRing::receivePacket (S32 socket, char *datap) +S32 LLPacketRing::receiveOrDropPacket(S32 socket, char *datap, bool drop) { S32 packet_size = 0; - // If using the throttle, simulate a limited size input buffer. - if (mUseInThrottle) + // pull straight from socket + if (LLProxy::isSOCKSProxyEnabled()) { - bool done = false; - - // push any current net packet (if any) onto delay ring - while (!done) + char buffer[NET_BUFFER_SIZE + SOCKS_HEADER_SIZE]; /* Flawfinder ignore */ + packet_size = receive_packet(socket, buffer); + if (packet_size > 0) { - LLPacketBuffer *packetp; - packetp = new LLPacketBuffer(socket); - - if (packetp->getSize()) - { - mActualBitsIn += packetp->getSize() * 8; - - // Fake packet loss - if (mDropPercentage && (ll_frand(100.f) < mDropPercentage)) - { - mPacketsToDrop++; - } - - if (mPacketsToDrop) - { - delete packetp; - packetp = NULL; - packet_size = 0; - mPacketsToDrop--; - } - } + mActualBytesIn += packet_size; + } - // If we faked packet loss, then we don't have a packet - // to use for buffer overflow testing - if (packetp) + if (packet_size > SOCKS_HEADER_SIZE) + { + if (drop) { - if (mInBufferLength + packetp->getSize() > mMaxBufferLength) - { - // Toss it. - LL_WARNS() << "Throwing away packet, overflowing buffer" << LL_ENDL; - delete packetp; - packetp = NULL; - } - else if (packetp->getSize()) - { - mReceiveQueue.push(packetp); - mInBufferLength += packetp->getSize(); - } - else - { - delete packetp; - packetp = NULL; - done = true; - } + packet_size = 0; } else { - // No packetp, keep going? - no packetp == faked packet loss - } - } - - // Now, grab data off of the receive queue according to our - // throttled bandwidth settings. - packet_size = receiveFromRing(socket, datap); - } - else - { - // no delay, pull straight from net - if (LLProxy::isSOCKSProxyEnabled()) - { - U8 buffer[NET_BUFFER_SIZE + SOCKS_HEADER_SIZE]; - packet_size = receive_packet(socket, static_cast<char*>(static_cast<void*>(buffer))); - - if (packet_size > SOCKS_HEADER_SIZE) - { // *FIX We are assuming ATYP is 0x01 (IPv4), not 0x03 (hostname) or 0x04 (IPv6) - memcpy(datap, buffer + SOCKS_HEADER_SIZE, packet_size - SOCKS_HEADER_SIZE); + packet_size -= SOCKS_HEADER_SIZE; // The unwrapped packet size + memcpy(datap, buffer + SOCKS_HEADER_SIZE, packet_size); proxywrap_t * header = static_cast<proxywrap_t*>(static_cast<void*>(buffer)); mLastSender.setAddress(header->addr); mLastSender.setPort(ntohs(header->port)); - - packet_size -= SOCKS_HEADER_SIZE; // The unwrapped packet size - } - else - { - packet_size = 0; + mLastReceivingIF = ::get_receiving_interface(); } } else { - packet_size = receive_packet(socket, datap); - mLastSender = ::get_sender(); + packet_size = 0; } - - mLastReceivingIF = ::get_receiving_interface(); - - if (packet_size) // did we actually get a packet? + } + else + { + packet_size = receive_packet(socket, datap); + if (packet_size > 0) { - if (mDropPercentage && (ll_frand(100.f) < mDropPercentage)) + mActualBytesIn += packet_size; + if (drop) { - mPacketsToDrop++; + packet_size = 0; } - - if (mPacketsToDrop) + else { - packet_size = 0; - mPacketsToDrop--; + mLastSender = ::get_sender(); + mLastReceivingIF = ::get_receiving_interface(); } } } - return packet_size; } -bool LLPacketRing::sendPacket(int h_socket, char * send_buffer, S32 buf_size, LLHost host) +S32 LLPacketRing::receiveOrDropBufferedPacket(char *datap, bool drop) { - bool status = true; - if (!mUseOutThrottle) + assert(mNumBufferedPackets > 0); + S32 packet_size = 0; + + S16 ring_size = (S16)(mPacketRing.size()); + S16 packet_index = (mHeadIndex + ring_size - mNumBufferedPackets) % ring_size; + LLPacketBuffer* packet = mPacketRing[packet_index]; + packet_size = packet->getSize(); + mLastSender = packet->getHost(); + mLastReceivingIF = packet->getReceivingInterface(); + + --mNumBufferedPackets; + mNumBufferedBytes -= packet_size; + if (mNumBufferedPackets == 0) { - return sendPacketImpl(h_socket, send_buffer, buf_size, host ); + assert(mNumBufferedBytes == 0); + } + + if (!drop) + { + assert(packet_size > 0); + memcpy(datap, packet->getData(), packet_size); } else { - mActualBitsOut += buf_size * 8; - LLPacketBuffer *packetp = NULL; - // See if we've got enough throttle to send a packet. - while (!mOutThrottle.checkOverflow(0.f)) - { - // While we have enough bandwidth, send a packet from the queue or the current packet + packet_size = 0; + } + return packet_size; +} + +S32 LLPacketRing::bufferInboundPacket(S32 socket) +{ + if (mNumBufferedPackets == mPacketRing.size() && mNumBufferedPackets < MAX_BUFFER_RING_SIZE) + { + expandRing(); + } - S32 packet_size = 0; - if (!mSendQueue.empty()) + LLPacketBuffer* packet = mPacketRing[mHeadIndex]; + S32 old_packet_size = packet->getSize(); + S32 packet_size = 0; + if (LLProxy::isSOCKSProxyEnabled()) + { + char buffer[NET_BUFFER_SIZE + SOCKS_HEADER_SIZE]; /* Flawfinder ignore */ + packet_size = receive_packet(socket, buffer); + if (packet_size > 0) + { + mActualBytesIn += packet_size; + if (packet_size > SOCKS_HEADER_SIZE) { - // Send a packet off of the queue - LLPacketBuffer *packetp = mSendQueue.front(); - mSendQueue.pop(); + // *FIX We are assuming ATYP is 0x01 (IPv4), not 0x03 (hostname) or 0x04 (IPv6) - mOutBufferLength -= packetp->getSize(); - packet_size = packetp->getSize(); + proxywrap_t * header = static_cast<proxywrap_t*>(static_cast<void*>(buffer)); + LLHost sender; + sender.setAddress(header->addr); + sender.setPort(ntohs(header->port)); - status = sendPacketImpl(h_socket, packetp->getData(), packet_size, packetp->getHost()); + packet_size -= SOCKS_HEADER_SIZE; // The unwrapped packet size + packet->init(buffer + SOCKS_HEADER_SIZE, packet_size, sender); - delete packetp; - // Update the throttle - mOutThrottle.throttleOverflow(packet_size * 8.f); + mHeadIndex = (mHeadIndex + 1) % (S16)(mPacketRing.size()); + if (mNumBufferedPackets < MAX_BUFFER_RING_SIZE) + { + ++mNumBufferedPackets; + mNumBufferedBytes += packet_size; + } + else + { + // we overwrote an older packet + mNumBufferedBytes += packet_size - old_packet_size; + } } else { - // If the queue's empty, we can just send this packet right away. - status = sendPacketImpl(h_socket, send_buffer, buf_size, host ); - packet_size = buf_size; - - // Update the throttle - mOutThrottle.throttleOverflow(packet_size * 8.f); - - // This was the packet we're sending now, there are no other packets - // that we need to send - return status; + packet_size = 0; } - - } - - // We haven't sent the incoming packet, add it to the queue - if (mOutBufferLength + buf_size > mMaxBufferLength) - { - // Nuke this packet, we overflowed the buffer. - // Toss it. - LL_WARNS() << "Throwing away outbound packet, overflowing buffer" << LL_ENDL; } - else + } + else + { + packet->init(socket); + packet_size = packet->getSize(); + if (packet_size > 0) { - static LLTimer queue_timer; - if ((mOutBufferLength > 4192) && queue_timer.getElapsedTimeF32() > 1.f) + mActualBytesIn += packet_size; + + mHeadIndex = (mHeadIndex + 1) % (S16)(mPacketRing.size()); + if (mNumBufferedPackets < MAX_BUFFER_RING_SIZE) { - // Add it to the queue - LL_INFOS() << "Outbound packet queue " << mOutBufferLength << " bytes" << LL_ENDL; - queue_timer.reset(); + ++mNumBufferedPackets; + mNumBufferedBytes += packet_size; + } + else + { + // we overwrote an older packet + mNumBufferedBytes += packet_size - old_packet_size; } - packetp = new LLPacketBuffer(host, send_buffer, buf_size); - - mOutBufferLength += packetp->getSize(); - mSendQueue.push(packetp); } } - - return status; + return packet_size; } -bool LLPacketRing::sendPacketImpl(int h_socket, const char * send_buffer, S32 buf_size, LLHost host) +S32 LLPacketRing::drainSocket(S32 socket) { - - if (!LLProxy::isSOCKSProxyEnabled()) + // drain into buffer + S32 packet_size = 1; + S32 num_loops = 0; + S32 old_num_packets = mNumBufferedPackets; + while (packet_size > 0) { - return send_packet(h_socket, send_buffer, buf_size, host.getAddress(), host.getPort()); + packet_size = bufferInboundPacket(socket); + ++num_loops; } + S32 num_dropped_packets = (num_loops - 1 + old_num_packets) - mNumBufferedPackets; + if (num_dropped_packets > 0) + { + LL_WARNS("Messaging") << "dropped " << num_dropped_packets << " UDP packets" << LL_ENDL; + } + return (S32)(mNumBufferedPackets); +} - char headered_send_buffer[NET_BUFFER_SIZE + SOCKS_HEADER_SIZE]; +bool LLPacketRing::expandRing() +{ + // compute larger size + constexpr S16 BUFFER_RING_EXPANSION = 256; + S16 old_size = (S16)(mPacketRing.size()); + S16 new_size = llmin(old_size + BUFFER_RING_EXPANSION, MAX_BUFFER_RING_SIZE); + if (new_size == old_size) + { + // mPacketRing is already maxed out + return false; + } - proxywrap_t *socks_header = static_cast<proxywrap_t*>(static_cast<void*>(&headered_send_buffer)); - socks_header->rsv = 0; - socks_header->addr = host.getAddress(); - socks_header->port = htons(host.getPort()); - socks_header->atype = ADDRESS_IPV4; - socks_header->frag = 0; + // make a larger ring and copy packet pointers + std::vector<LLPacketBuffer*> new_ring(new_size, nullptr); + for (S16 i = 0; i < old_size; ++i) + { + S16 j = (mHeadIndex + i) % old_size; + new_ring[i] = mPacketRing[j]; + } - memcpy(headered_send_buffer + SOCKS_HEADER_SIZE, send_buffer, buf_size); + // allocate new packets for the remainder of new_ring + LLHost invalid_host; + for (S16 i = old_size; i < new_size; ++i) + { + new_ring[i] = new LLPacketBuffer(invalid_host, nullptr, 0); + } - return send_packet( h_socket, - headered_send_buffer, - buf_size + SOCKS_HEADER_SIZE, - LLProxy::getInstance()->getUDPProxy().getAddress(), - LLProxy::getInstance()->getUDPProxy().getPort()); + // swap the rings and reset mHeadIndex + mPacketRing.swap(new_ring); + mHeadIndex = mNumBufferedPackets; + return true; } diff --git a/indra/llmessage/llpacketring.h b/indra/llmessage/llpacketring.h index f0e95f8524..0dff2c63b1 100644 --- a/indra/llmessage/llpacketring.h +++ b/indra/llmessage/llpacketring.h @@ -25,16 +25,14 @@ * $/LicenseInfo$ */ -#ifndef LL_LLPACKETRING_H -#define LL_LLPACKETRING_H +#pragma once -#include <queue> +#include <vector> #include "llhost.h" #include "llpacketbuffer.h" -#include "llproxy.h" #include "llthrottle.h" -#include "net.h" + class LLPacketRing { @@ -42,60 +40,65 @@ public: LLPacketRing(); ~LLPacketRing(); - void cleanup(); + // receive one packet: either buffered or from the socket + S32 receivePacket (S32 socket, char *datap); + + // send one packet + bool sendPacket(int h_socket, const char * send_buffer, S32 buf_size, LLHost host); + + // drains packets from socket and returns final mNumBufferedPackets + S32 drainSocket(S32 socket); void dropPackets(U32); void setDropPercentage (F32 percent_to_drop); - void setUseInThrottle(const bool use_throttle); - void setUseOutThrottle(const bool use_throttle); - void setInBandwidth(const F32 bps); - void setOutBandwidth(const F32 bps); - S32 receivePacket (S32 socket, char *datap); - S32 receiveFromRing (S32 socket, char *datap); - bool sendPacket(int h_socket, char * send_buffer, S32 buf_size, LLHost host); + inline LLHost getLastSender() const; + inline LLHost getLastReceivingInterface() const; - inline LLHost getLastSender(); - inline LLHost getLastReceivingInterface(); + S32 getActualInBytes() const { return mActualBytesIn; } + S32 getActualOutBytes() const { return mActualBytesOut; } + S32 getAndResetActualInBits() { S32 bits = mActualBytesIn * 8; mActualBytesIn = 0; return bits;} + S32 getAndResetActualOutBits() { S32 bits = mActualBytesOut * 8; mActualBytesOut = 0; return bits;} - S32 getAndResetActualInBits() { S32 bits = mActualBitsIn; mActualBitsIn = 0; return bits;} - S32 getAndResetActualOutBits() { S32 bits = mActualBitsOut; mActualBitsOut = 0; return bits;} + S32 getNumBufferedPackets() const { return (S32)(mNumBufferedPackets); } + S32 getNumBufferedBytes() const { return mNumBufferedBytes; } protected: - bool mUseInThrottle; - bool mUseOutThrottle; + // returns 'true' if we should intentionally drop a packet + bool computeDrop(); - // For simulating a lower-bandwidth connection - BPS - LLThrottle mInThrottle; - LLThrottle mOutThrottle; + // returns packet_size of received packet, zero or less if no packet found + S32 receiveOrDropPacket(S32 socket, char *datap, bool drop); + S32 receiveOrDropBufferedPacket(char *datap, bool drop); - S32 mActualBitsIn; - S32 mActualBitsOut; - S32 mMaxBufferLength; // How much data can we queue up before dropping data. - S32 mInBufferLength; // Current incoming buffer length - S32 mOutBufferLength; // Current outgoing buffer length + // returns packet_size of packet buffered + S32 bufferInboundPacket(S32 socket); - F32 mDropPercentage; // % of packets to drop - U32 mPacketsToDrop; // drop next n packets + // returns 'true' if ring was expanded + bool expandRing(); - std::queue<LLPacketBuffer *> mReceiveQueue; - std::queue<LLPacketBuffer *> mSendQueue; +protected: + std::vector<LLPacketBuffer*> mPacketRing; + S16 mHeadIndex { 0 }; + S16 mNumBufferedPackets { 0 }; + S32 mNumBufferedBytes { 0 }; + + S32 mActualBytesIn { 0 }; + S32 mActualBytesOut { 0 }; + F32 mDropPercentage { 0.0f }; // % of inbound packets to drop + U32 mPacketsToDrop { 0 }; // drop next inbound n packets + // These are the sender and receiving_interface for the last packet delivered by receivePacket() LLHost mLastSender; LLHost mLastReceivingIF; - -private: - bool sendPacketImpl(int h_socket, const char * send_buffer, S32 buf_size, LLHost host); }; -inline LLHost LLPacketRing::getLastSender() +inline LLHost LLPacketRing::getLastSender() const { return mLastSender; } -inline LLHost LLPacketRing::getLastReceivingInterface() +inline LLHost LLPacketRing::getLastReceivingInterface() const { return mLastReceivingIF; } - -#endif diff --git a/indra/llmessage/message.cpp b/indra/llmessage/message.cpp index cfa5178fc6..c130b7a6db 100644 --- a/indra/llmessage/message.cpp +++ b/indra/llmessage/message.cpp @@ -656,8 +656,7 @@ bool LLMessageSystem::checkMessages(LockMessageChecker&, S64 frame_count ) // UseCircuitCode is allowed in even from an invalid circuit, so that // we can toss circuits around. - if( - valid_packet && + else if ( !cdp && (mTemplateMessageReader->getMessageName() != _PREHASH_UseCircuitCode)) @@ -667,8 +666,7 @@ bool LLMessageSystem::checkMessages(LockMessageChecker&, S64 frame_count ) valid_packet = false; } - if( - valid_packet && + if ( valid_packet && cdp && !cdp->getTrusted() && mTemplateMessageReader->isTrusted()) @@ -680,7 +678,7 @@ bool LLMessageSystem::checkMessages(LockMessageChecker&, S64 frame_count ) valid_packet = false; } - if( valid_packet ) + if ( valid_packet ) { logValidMsg(cdp, host, recv_reliable, recv_resent, acks>0 ); valid_packet = mTemplateMessageReader->readMessage(buffer, host); @@ -821,6 +819,11 @@ void LLMessageSystem::processAcks(LockMessageChecker&, F32 collect_time) } } +S32 LLMessageSystem::drainUdpSocket() +{ + return mPacketRing.drainSocket(mSocket); +} + void LLMessageSystem::copyMessageReceivedToSend() { // NOTE: babbage: switch builder to match reader to avoid diff --git a/indra/llmessage/message.h b/indra/llmessage/message.h index b4b0d94021..1844d5e7cd 100644 --- a/indra/llmessage/message.h +++ b/indra/llmessage/message.h @@ -417,6 +417,9 @@ public: bool checkMessages(LockMessageChecker&, S64 frame_count = 0 ); void processAcks(LockMessageChecker&, F32 collect_time = 0.f); + // returns total number of buffered packets after the drain + S32 drainUdpSocket(); + bool isMessageFast(const char *msg); bool isMessage(const char *msg) { diff --git a/indra/llmessage/net.cpp b/indra/llmessage/net.cpp index f153c938cf..2be5a9e5b6 100644 --- a/indra/llmessage/net.cpp +++ b/indra/llmessage/net.cpp @@ -76,14 +76,8 @@ static U32 gsnReceivingIFAddr = INVALID_HOST_IP_ADDRESS; // Address to which dat const char* LOOPBACK_ADDRESS_STRING = "127.0.0.1"; const char* BROADCAST_ADDRESS_STRING = "255.255.255.255"; -#if LL_DARWIN - // macOS returns an error when trying to set these to 400000. Smaller values succeed. - const int SEND_BUFFER_SIZE = 200000; - const int RECEIVE_BUFFER_SIZE = 200000; -#else // LL_DARWIN - const int SEND_BUFFER_SIZE = 400000; - const int RECEIVE_BUFFER_SIZE = 400000; -#endif // LL_DARWIN +const int SEND_BUFFER_SIZE = 200000; +const int RECEIVE_BUFFER_SIZE = 800000; // universal functions (cross-platform) diff --git a/indra/newview/llappviewer.cpp b/indra/newview/llappviewer.cpp index 52bea875d9..ba474d3f39 100644 --- a/indra/newview/llappviewer.cpp +++ b/indra/newview/llappviewer.cpp @@ -350,8 +350,6 @@ LLVector3 gRelativeWindVec(0.0, 0.0, 0.0); U32 gPacketsIn = 0; -bool gPrintMessagesThisFrame = false; - bool gRandomizeFramerate = false; bool gPeriodicSlowFrame = false; @@ -1495,9 +1493,9 @@ bool LLAppViewer::doFrame() { LL_PROFILE_ZONE_NAMED_CATEGORY_APP("df pauseMainloopTimeout"); - pingMainloopTimeout("Main:Sleep"); + pingMainloopTimeout("Main:Sleep"); - pauseMainloopTimeout(); + pauseMainloopTimeout(); } // Sleep and run background threads @@ -5217,12 +5215,9 @@ void LLAppViewer::idleNameCache() // Handle messages, and all message related stuff // -#define TIME_THROTTLE_MESSAGES -#ifdef TIME_THROTTLE_MESSAGES -#define CHECK_MESSAGES_DEFAULT_MAX_TIME .020f // 50 ms = 50 fps (just for messages!) +constexpr F32 CHECK_MESSAGES_DEFAULT_MAX_TIME = 0.020f; // 50 ms = 50 fps (just for messages!) static F32 CheckMessagesMaxTime = CHECK_MESSAGES_DEFAULT_MAX_TIME; -#endif static LLTrace::BlockTimerStatHandle FTM_IDLE_NETWORK("Idle Network"); static LLTrace::BlockTimerStatHandle FTM_MESSAGE_ACKS("Message Acks"); @@ -5249,6 +5244,7 @@ void LLAppViewer::idleNetwork() F32 total_time = 0.0f; { + bool needs_drain = false; LockMessageChecker lmc(gMessageSystem); while (lmc.checkAllMessages(frame_count, gServicePump)) { @@ -5265,50 +5261,41 @@ void LLAppViewer::idleNetwork() if (total_decoded > MESSAGE_MAX_PER_FRAME) { + needs_drain = true; break; } -#ifdef TIME_THROTTLE_MESSAGES // Prevent slow packets from completely destroying the frame rate. // This usually happens due to clumps of avatars taking huge amount // of network processing time (which needs to be fixed, but this is // a good limit anyway). total_time = check_message_timer.getElapsedTimeF32(); if (total_time >= CheckMessagesMaxTime) + { + needs_drain = true; break; -#endif + } + } + if (needs_drain || gMessageSystem->mPacketRing.getNumBufferedPackets() > 0) + { + // Rather than allow packets to silently backup on the socket + // we drain them into our own buffer so we know how many exist. + S32 num_buffered_packets = gMessageSystem->drainUdpSocket(); + if (num_buffered_packets > 0) + { + // Increase CheckMessagesMaxTime so that we will eventually catch up + CheckMessagesMaxTime *= 1.035f; // 3.5% ~= 2x in 20 frames, ~8x in 60 frames + } + } + else + { + // Reset CheckMessagesMaxTime to default value + CheckMessagesMaxTime = CHECK_MESSAGES_DEFAULT_MAX_TIME; } // Handle per-frame message system processing. lmc.processAcks(gSavedSettings.getF32("AckCollectTime")); } - -#ifdef TIME_THROTTLE_MESSAGES - if (total_time >= CheckMessagesMaxTime) - { - // Increase CheckMessagesMaxTime so that we will eventually catch up - CheckMessagesMaxTime *= 1.035f; // 3.5% ~= x2 in 20 frames, ~8x in 60 frames - } - else - { - // Reset CheckMessagesMaxTime to default value - CheckMessagesMaxTime = CHECK_MESSAGES_DEFAULT_MAX_TIME; - } -#endif - - // Decode enqueued messages... - S32 remaining_possible_decodes = MESSAGE_MAX_PER_FRAME - total_decoded; - - if( remaining_possible_decodes <= 0 ) - { - LL_INFOS() << "Maxed out number of messages per frame at " << MESSAGE_MAX_PER_FRAME << LL_ENDL; - } - - if (gPrintMessagesThisFrame) - { - LL_INFOS() << "Decoded " << total_decoded << " msgs this frame!" << LL_ENDL; - gPrintMessagesThisFrame = false; - } } add(LLStatViewer::NUM_NEW_OBJECTS, gObjectList.mNumNewObjects); diff --git a/indra/newview/llappviewer.h b/indra/newview/llappviewer.h index 4ce4259ed8..a253f06d14 100644 --- a/indra/newview/llappviewer.h +++ b/indra/newview/llappviewer.h @@ -400,7 +400,6 @@ extern std::string gLastVersionChannel; extern LLVector3 gWindVec; extern LLVector3 gRelativeWindVec; extern U32 gPacketsIn; -extern bool gPrintMessagesThisFrame; extern bool gRandomizeFramerate; extern bool gPeriodicSlowFrame; diff --git a/indra/newview/llstartup.cpp b/indra/newview/llstartup.cpp index b32b80331a..5f6c5e1e15 100644 --- a/indra/newview/llstartup.cpp +++ b/indra/newview/llstartup.cpp @@ -623,21 +623,6 @@ bool idle_startup() F32 dropPercent = gSavedSettings.getF32("PacketDropPercentage"); msg->mPacketRing.setDropPercentage(dropPercent); - - F32 inBandwidth = gSavedSettings.getF32("InBandwidth"); - F32 outBandwidth = gSavedSettings.getF32("OutBandwidth"); - if (inBandwidth != 0.f) - { - LL_DEBUGS("AppInit") << "Setting packetring incoming bandwidth to " << inBandwidth << LL_ENDL; - msg->mPacketRing.setUseInThrottle(true); - msg->mPacketRing.setInBandwidth(inBandwidth); - } - if (outBandwidth != 0.f) - { - LL_DEBUGS("AppInit") << "Setting packetring outgoing bandwidth to " << outBandwidth << LL_ENDL; - msg->mPacketRing.setUseOutThrottle(true); - msg->mPacketRing.setOutBandwidth(outBandwidth); - } } LL_INFOS("AppInit") << "Message System Initialized." << LL_ENDL; |