From 6d0b0a77eed584c6bcf77f6cd5fdbdbf89a987d7 Mon Sep 17 00:00:00 2001 From: Andrew Meadows Date: Tue, 18 Feb 2025 11:38:52 -0800 Subject: drain UDP socket to avoid dropped packets (#3565) drain UDP socket in idleNetwork() to avoid dropped packets --- indra/llmessage/llpacketbuffer.cpp | 26 ++- indra/llmessage/llpacketbuffer.h | 12 +- indra/llmessage/llpacketring.cpp | 444 +++++++++++++++++-------------------- indra/llmessage/llpacketring.h | 79 +++---- indra/llmessage/message.cpp | 13 +- indra/llmessage/message.h | 3 + indra/llmessage/net.cpp | 10 +- 7 files changed, 286 insertions(+), 301 deletions(-) (limited to 'indra/llmessage') diff --git a/indra/llmessage/llpacketbuffer.cpp b/indra/llmessage/llpacketbuffer.cpp index dc5c7a73cb..0b04a560be 100644 --- a/indra/llmessage/llpacketbuffer.cpp +++ b/indra/llmessage/llpacketbuffer.cpp @@ -32,8 +32,6 @@ #include "lltimer.h" #include "llhost.h" -/////////////////////////////////////////////////////////// - LLPacketBuffer::LLPacketBuffer(const LLHost &host, const char *datap, const S32 size) : mHost(host) { mSize = 0; @@ -41,7 +39,7 @@ LLPacketBuffer::LLPacketBuffer(const LLHost &host, const char *datap, const S32 if (size > NET_BUFFER_SIZE) { - LL_ERRS() << "Sending packet > " << NET_BUFFER_SIZE << " of size " << size << LL_ENDL; + LL_ERRS() << "Constructing packet with size=" << size << " > " << NET_BUFFER_SIZE << LL_ENDL; } else { @@ -51,7 +49,6 @@ LLPacketBuffer::LLPacketBuffer(const LLHost &host, const char *datap, const S32 mSize = size; } } - } LLPacketBuffer::LLPacketBuffer (S32 hSocket) @@ -59,18 +56,29 @@ LLPacketBuffer::LLPacketBuffer (S32 hSocket) init(hSocket); } -/////////////////////////////////////////////////////////// - LLPacketBuffer::~LLPacketBuffer () { } -/////////////////////////////////////////////////////////// - -void LLPacketBuffer::init (S32 hSocket) +void LLPacketBuffer::init(S32 hSocket) { mSize = receive_packet(hSocket, mData); mHost = ::get_sender(); mReceivingIF = ::get_receiving_interface(); } +void LLPacketBuffer::init(const char* buffer, S32 data_size, const LLHost& host) +{ + if (data_size > NET_BUFFER_SIZE) + { + LL_ERRS() << "Initializing packet with size=" << data_size << " > " << NET_BUFFER_SIZE << LL_ENDL; + } + else + { + memcpy(mData, buffer, data_size); + mSize = data_size; + mHost = host; + mReceivingIF = ::get_receiving_interface(); + } +} + diff --git a/indra/llmessage/llpacketbuffer.h b/indra/llmessage/llpacketbuffer.h index a2d2973fb0..ac4012d330 100644 --- a/indra/llmessage/llpacketbuffer.h +++ b/indra/llmessage/llpacketbuffer.h @@ -35,20 +35,22 @@ class LLPacketBuffer { public: LLPacketBuffer(const LLHost &host, const char *datap, const S32 size); - LLPacketBuffer(S32 hSocket); // receive a packet + LLPacketBuffer(S32 hSocket); // receive a packet ~LLPacketBuffer(); S32 getSize() const { return mSize; } const char *getData() const { return mData; } LLHost getHost() const { return mHost; } LLHost getReceivingInterface() const { return mReceivingIF; } + void init(S32 hSocket); + void init(const char* buffer, S32 data_size, const LLHost& host); protected: - char mData[NET_BUFFER_SIZE]; // packet data /* Flawfinder : ignore */ - S32 mSize; // size of buffer in bytes - LLHost mHost; // source/dest IP and port - LLHost mReceivingIF; // source/dest IP and port + char mData[NET_BUFFER_SIZE]; // packet data /* Flawfinder : ignore */ + S32 mSize; // size of buffer in bytes + LLHost mHost; // source/dest IP and port + LLHost mReceivingIF; // source/dest IP and port }; #endif diff --git a/indra/llmessage/llpacketring.cpp b/indra/llmessage/llpacketring.cpp index be838770a8..ae5a2168db 100644 --- a/indra/llmessage/llpacketring.cpp +++ b/indra/llmessage/llpacketring.cpp @@ -1,6 +1,6 @@ /** * @file llpacketring.cpp - * @brief implementation of LLPacketRing class for a packet. + * @brief implementation of LLPacketRing class. * * $LicenseInfo:firstyear=2001&license=viewerlgpl$ * Second Life Viewer Source Code @@ -43,329 +43,301 @@ #include "message.h" #include "u64.h" -/////////////////////////////////////////////////////////// -LLPacketRing::LLPacketRing () : - mUseInThrottle(false), - mUseOutThrottle(false), - mInThrottle(256000.f), - mOutThrottle(64000.f), - mActualBitsIn(0), - mActualBitsOut(0), - mMaxBufferLength(64000), - mInBufferLength(0), - mOutBufferLength(0), - mDropPercentage(0.0f), - mPacketsToDrop(0x0) +constexpr S16 MAX_BUFFER_RING_SIZE = 1024; +constexpr S16 DEFAULT_BUFFER_RING_SIZE = 256; + +LLPacketRing::LLPacketRing () + : mPacketRing(DEFAULT_BUFFER_RING_SIZE, nullptr) { + LLHost invalid_host; + for (size_t i = 0; i < mPacketRing.size(); ++i) + { + mPacketRing[i] = new LLPacketBuffer(invalid_host, nullptr, 0); + } } -/////////////////////////////////////////////////////////// LLPacketRing::~LLPacketRing () { - cleanup(); + for (auto packet : mPacketRing) + { + delete packet; + } + mPacketRing.clear(); + mNumBufferedPackets = 0; + mNumBufferedBytes = 0; + mHeadIndex = 0; } -/////////////////////////////////////////////////////////// -void LLPacketRing::cleanup () +S32 LLPacketRing::receivePacket (S32 socket, char *datap) { - LLPacketBuffer *packetp; + bool drop = computeDrop(); + return (mNumBufferedPackets > 0) ? + receiveOrDropBufferedPacket(datap, drop) : + receiveOrDropPacket(socket, datap, drop); +} - while (!mReceiveQueue.empty()) +bool send_packet_helper(int socket, const char * datap, S32 data_size, LLHost host) +{ + if (!LLProxy::isSOCKSProxyEnabled()) { - packetp = mReceiveQueue.front(); - delete packetp; - mReceiveQueue.pop(); + return send_packet(socket, datap, data_size, host.getAddress(), host.getPort()); } - while (!mSendQueue.empty()) - { - packetp = mSendQueue.front(); - delete packetp; - mSendQueue.pop(); - } -} + char headered_send_buffer[NET_BUFFER_SIZE + SOCKS_HEADER_SIZE]; -/////////////////////////////////////////////////////////// -void LLPacketRing::dropPackets (U32 num_to_drop) -{ - mPacketsToDrop += num_to_drop; -} + proxywrap_t *socks_header = static_cast(static_cast(&headered_send_buffer)); + socks_header->rsv = 0; + socks_header->addr = host.getAddress(); + socks_header->port = htons(host.getPort()); + socks_header->atype = ADDRESS_IPV4; + socks_header->frag = 0; -/////////////////////////////////////////////////////////// -void LLPacketRing::setDropPercentage (F32 percent_to_drop) -{ - mDropPercentage = percent_to_drop; -} + memcpy(headered_send_buffer + SOCKS_HEADER_SIZE, datap, data_size); -void LLPacketRing::setUseInThrottle(const bool use_throttle) -{ - mUseInThrottle = use_throttle; + return send_packet( socket, + headered_send_buffer, + data_size + SOCKS_HEADER_SIZE, + LLProxy::getInstance()->getUDPProxy().getAddress(), + LLProxy::getInstance()->getUDPProxy().getPort()); } -void LLPacketRing::setUseOutThrottle(const bool use_throttle) +bool LLPacketRing::sendPacket(int socket, const char * datap, S32 data_size, LLHost host) { - mUseOutThrottle = use_throttle; + mActualBytesOut += data_size; + return send_packet_helper(socket, datap, data_size, host); } -void LLPacketRing::setInBandwidth(const F32 bps) +void LLPacketRing::dropPackets (U32 num_to_drop) { - mInThrottle.setRate(bps); + mPacketsToDrop += num_to_drop; } -void LLPacketRing::setOutBandwidth(const F32 bps) +void LLPacketRing::setDropPercentage (F32 percent_to_drop) { - mOutThrottle.setRate(bps); + mDropPercentage = percent_to_drop; } -/////////////////////////////////////////////////////////// -S32 LLPacketRing::receiveFromRing (S32 socket, char *datap) -{ - if (mInThrottle.checkOverflow(0)) - { - // We don't have enough bandwidth, don't give them a packet. - return 0; - } - - LLPacketBuffer *packetp = NULL; - if (mReceiveQueue.empty()) +bool LLPacketRing::computeDrop() +{ + bool drop= (mDropPercentage > 0.0f && (ll_frand(100.f) < mDropPercentage)); + if (drop) { - // No packets on the queue, don't give them any. - return 0; + ++mPacketsToDrop; } - - S32 packet_size = 0; - packetp = mReceiveQueue.front(); - mReceiveQueue.pop(); - packet_size = packetp->getSize(); - if (packetp->getData() != NULL) + if (mPacketsToDrop > 0) { - memcpy(datap, packetp->getData(), packet_size); /*Flawfinder: ignore*/ + --mPacketsToDrop; + drop = true; } - // need to set sender IP/port!! - mLastSender = packetp->getHost(); - mLastReceivingIF = packetp->getReceivingInterface(); - delete packetp; - - this->mInBufferLength -= packet_size; - - // Adjust the throttle - mInThrottle.throttleOverflow(packet_size * 8.f); - return packet_size; + return drop; } -/////////////////////////////////////////////////////////// -S32 LLPacketRing::receivePacket (S32 socket, char *datap) +S32 LLPacketRing::receiveOrDropPacket(S32 socket, char *datap, bool drop) { S32 packet_size = 0; - // If using the throttle, simulate a limited size input buffer. - if (mUseInThrottle) + // pull straight from socket + if (LLProxy::isSOCKSProxyEnabled()) { - bool done = false; - - // push any current net packet (if any) onto delay ring - while (!done) + char buffer[NET_BUFFER_SIZE + SOCKS_HEADER_SIZE]; /* Flawfinder ignore */ + packet_size = receive_packet(socket, buffer); + if (packet_size > 0) { - LLPacketBuffer *packetp; - packetp = new LLPacketBuffer(socket); - - if (packetp->getSize()) - { - mActualBitsIn += packetp->getSize() * 8; - - // Fake packet loss - if (mDropPercentage && (ll_frand(100.f) < mDropPercentage)) - { - mPacketsToDrop++; - } - - if (mPacketsToDrop) - { - delete packetp; - packetp = NULL; - packet_size = 0; - mPacketsToDrop--; - } - } + mActualBytesIn += packet_size; + } - // If we faked packet loss, then we don't have a packet - // to use for buffer overflow testing - if (packetp) + if (packet_size > SOCKS_HEADER_SIZE) + { + if (drop) { - if (mInBufferLength + packetp->getSize() > mMaxBufferLength) - { - // Toss it. - LL_WARNS() << "Throwing away packet, overflowing buffer" << LL_ENDL; - delete packetp; - packetp = NULL; - } - else if (packetp->getSize()) - { - mReceiveQueue.push(packetp); - mInBufferLength += packetp->getSize(); - } - else - { - delete packetp; - packetp = NULL; - done = true; - } + packet_size = 0; } else - { - // No packetp, keep going? - no packetp == faked packet loss - } - } - - // Now, grab data off of the receive queue according to our - // throttled bandwidth settings. - packet_size = receiveFromRing(socket, datap); - } - else - { - // no delay, pull straight from net - if (LLProxy::isSOCKSProxyEnabled()) - { - U8 buffer[NET_BUFFER_SIZE + SOCKS_HEADER_SIZE]; - packet_size = receive_packet(socket, static_cast(static_cast(buffer))); - - if (packet_size > SOCKS_HEADER_SIZE) { // *FIX We are assuming ATYP is 0x01 (IPv4), not 0x03 (hostname) or 0x04 (IPv6) - memcpy(datap, buffer + SOCKS_HEADER_SIZE, packet_size - SOCKS_HEADER_SIZE); + packet_size -= SOCKS_HEADER_SIZE; // The unwrapped packet size + memcpy(datap, buffer + SOCKS_HEADER_SIZE, packet_size); proxywrap_t * header = static_cast(static_cast(buffer)); mLastSender.setAddress(header->addr); mLastSender.setPort(ntohs(header->port)); - - packet_size -= SOCKS_HEADER_SIZE; // The unwrapped packet size - } - else - { - packet_size = 0; + mLastReceivingIF = ::get_receiving_interface(); } } else { - packet_size = receive_packet(socket, datap); - mLastSender = ::get_sender(); + packet_size = 0; } - - mLastReceivingIF = ::get_receiving_interface(); - - if (packet_size) // did we actually get a packet? + } + else + { + packet_size = receive_packet(socket, datap); + if (packet_size > 0) { - if (mDropPercentage && (ll_frand(100.f) < mDropPercentage)) + mActualBytesIn += packet_size; + if (drop) { - mPacketsToDrop++; + packet_size = 0; } - - if (mPacketsToDrop) + else { - packet_size = 0; - mPacketsToDrop--; + mLastSender = ::get_sender(); + mLastReceivingIF = ::get_receiving_interface(); } } } - return packet_size; } -bool LLPacketRing::sendPacket(int h_socket, char * send_buffer, S32 buf_size, LLHost host) +S32 LLPacketRing::receiveOrDropBufferedPacket(char *datap, bool drop) { - bool status = true; - if (!mUseOutThrottle) + assert(mNumBufferedPackets > 0); + S32 packet_size = 0; + + S16 ring_size = (S16)(mPacketRing.size()); + S16 packet_index = (mHeadIndex + ring_size - mNumBufferedPackets) % ring_size; + LLPacketBuffer* packet = mPacketRing[packet_index]; + packet_size = packet->getSize(); + mLastSender = packet->getHost(); + mLastReceivingIF = packet->getReceivingInterface(); + + --mNumBufferedPackets; + mNumBufferedBytes -= packet_size; + if (mNumBufferedPackets == 0) { - return sendPacketImpl(h_socket, send_buffer, buf_size, host ); + assert(mNumBufferedBytes == 0); + } + + if (!drop) + { + assert(packet_size > 0); + memcpy(datap, packet->getData(), packet_size); } else { - mActualBitsOut += buf_size * 8; - LLPacketBuffer *packetp = NULL; - // See if we've got enough throttle to send a packet. - while (!mOutThrottle.checkOverflow(0.f)) - { - // While we have enough bandwidth, send a packet from the queue or the current packet + packet_size = 0; + } + return packet_size; +} + +S32 LLPacketRing::bufferInboundPacket(S32 socket) +{ + if (mNumBufferedPackets == mPacketRing.size() && mNumBufferedPackets < MAX_BUFFER_RING_SIZE) + { + expandRing(); + } - S32 packet_size = 0; - if (!mSendQueue.empty()) + LLPacketBuffer* packet = mPacketRing[mHeadIndex]; + S32 old_packet_size = packet->getSize(); + S32 packet_size = 0; + if (LLProxy::isSOCKSProxyEnabled()) + { + char buffer[NET_BUFFER_SIZE + SOCKS_HEADER_SIZE]; /* Flawfinder ignore */ + packet_size = receive_packet(socket, buffer); + if (packet_size > 0) + { + mActualBytesIn += packet_size; + if (packet_size > SOCKS_HEADER_SIZE) { - // Send a packet off of the queue - LLPacketBuffer *packetp = mSendQueue.front(); - mSendQueue.pop(); + // *FIX We are assuming ATYP is 0x01 (IPv4), not 0x03 (hostname) or 0x04 (IPv6) - mOutBufferLength -= packetp->getSize(); - packet_size = packetp->getSize(); + proxywrap_t * header = static_cast(static_cast(buffer)); + LLHost sender; + sender.setAddress(header->addr); + sender.setPort(ntohs(header->port)); - status = sendPacketImpl(h_socket, packetp->getData(), packet_size, packetp->getHost()); + packet_size -= SOCKS_HEADER_SIZE; // The unwrapped packet size + packet->init(buffer + SOCKS_HEADER_SIZE, packet_size, sender); - delete packetp; - // Update the throttle - mOutThrottle.throttleOverflow(packet_size * 8.f); + mHeadIndex = (mHeadIndex + 1) % (S16)(mPacketRing.size()); + if (mNumBufferedPackets < MAX_BUFFER_RING_SIZE) + { + ++mNumBufferedPackets; + mNumBufferedBytes += packet_size; + } + else + { + // we overwrote an older packet + mNumBufferedBytes += packet_size - old_packet_size; + } } else { - // If the queue's empty, we can just send this packet right away. - status = sendPacketImpl(h_socket, send_buffer, buf_size, host ); - packet_size = buf_size; - - // Update the throttle - mOutThrottle.throttleOverflow(packet_size * 8.f); - - // This was the packet we're sending now, there are no other packets - // that we need to send - return status; + packet_size = 0; } - - } - - // We haven't sent the incoming packet, add it to the queue - if (mOutBufferLength + buf_size > mMaxBufferLength) - { - // Nuke this packet, we overflowed the buffer. - // Toss it. - LL_WARNS() << "Throwing away outbound packet, overflowing buffer" << LL_ENDL; } - else + } + else + { + packet->init(socket); + packet_size = packet->getSize(); + if (packet_size > 0) { - static LLTimer queue_timer; - if ((mOutBufferLength > 4192) && queue_timer.getElapsedTimeF32() > 1.f) + mActualBytesIn += packet_size; + + mHeadIndex = (mHeadIndex + 1) % (S16)(mPacketRing.size()); + if (mNumBufferedPackets < MAX_BUFFER_RING_SIZE) { - // Add it to the queue - LL_INFOS() << "Outbound packet queue " << mOutBufferLength << " bytes" << LL_ENDL; - queue_timer.reset(); + ++mNumBufferedPackets; + mNumBufferedBytes += packet_size; + } + else + { + // we overwrote an older packet + mNumBufferedBytes += packet_size - old_packet_size; } - packetp = new LLPacketBuffer(host, send_buffer, buf_size); - - mOutBufferLength += packetp->getSize(); - mSendQueue.push(packetp); } } - - return status; + return packet_size; } -bool LLPacketRing::sendPacketImpl(int h_socket, const char * send_buffer, S32 buf_size, LLHost host) +S32 LLPacketRing::drainSocket(S32 socket) { - - if (!LLProxy::isSOCKSProxyEnabled()) + // drain into buffer + S32 packet_size = 1; + S32 num_loops = 0; + S32 old_num_packets = mNumBufferedPackets; + while (packet_size > 0) { - return send_packet(h_socket, send_buffer, buf_size, host.getAddress(), host.getPort()); + packet_size = bufferInboundPacket(socket); + ++num_loops; } + S32 num_dropped_packets = (num_loops - 1 + old_num_packets) - mNumBufferedPackets; + if (num_dropped_packets > 0) + { + LL_WARNS("Messaging") << "dropped " << num_dropped_packets << " UDP packets" << LL_ENDL; + } + return (S32)(mNumBufferedPackets); +} - char headered_send_buffer[NET_BUFFER_SIZE + SOCKS_HEADER_SIZE]; +bool LLPacketRing::expandRing() +{ + // compute larger size + constexpr S16 BUFFER_RING_EXPANSION = 256; + S16 old_size = (S16)(mPacketRing.size()); + S16 new_size = llmin(old_size + BUFFER_RING_EXPANSION, MAX_BUFFER_RING_SIZE); + if (new_size == old_size) + { + // mPacketRing is already maxed out + return false; + } - proxywrap_t *socks_header = static_cast(static_cast(&headered_send_buffer)); - socks_header->rsv = 0; - socks_header->addr = host.getAddress(); - socks_header->port = htons(host.getPort()); - socks_header->atype = ADDRESS_IPV4; - socks_header->frag = 0; + // make a larger ring and copy packet pointers + std::vector new_ring(new_size, nullptr); + for (S16 i = 0; i < old_size; ++i) + { + S16 j = (mHeadIndex + i) % old_size; + new_ring[i] = mPacketRing[j]; + } - memcpy(headered_send_buffer + SOCKS_HEADER_SIZE, send_buffer, buf_size); + // allocate new packets for the remainder of new_ring + LLHost invalid_host; + for (S16 i = old_size; i < new_size; ++i) + { + new_ring[i] = new LLPacketBuffer(invalid_host, nullptr, 0); + } - return send_packet( h_socket, - headered_send_buffer, - buf_size + SOCKS_HEADER_SIZE, - LLProxy::getInstance()->getUDPProxy().getAddress(), - LLProxy::getInstance()->getUDPProxy().getPort()); + // swap the rings and reset mHeadIndex + mPacketRing.swap(new_ring); + mHeadIndex = mNumBufferedPackets; + return true; } diff --git a/indra/llmessage/llpacketring.h b/indra/llmessage/llpacketring.h index f0e95f8524..0dff2c63b1 100644 --- a/indra/llmessage/llpacketring.h +++ b/indra/llmessage/llpacketring.h @@ -25,16 +25,14 @@ * $/LicenseInfo$ */ -#ifndef LL_LLPACKETRING_H -#define LL_LLPACKETRING_H +#pragma once -#include +#include #include "llhost.h" #include "llpacketbuffer.h" -#include "llproxy.h" #include "llthrottle.h" -#include "net.h" + class LLPacketRing { @@ -42,60 +40,65 @@ public: LLPacketRing(); ~LLPacketRing(); - void cleanup(); + // receive one packet: either buffered or from the socket + S32 receivePacket (S32 socket, char *datap); + + // send one packet + bool sendPacket(int h_socket, const char * send_buffer, S32 buf_size, LLHost host); + + // drains packets from socket and returns final mNumBufferedPackets + S32 drainSocket(S32 socket); void dropPackets(U32); void setDropPercentage (F32 percent_to_drop); - void setUseInThrottle(const bool use_throttle); - void setUseOutThrottle(const bool use_throttle); - void setInBandwidth(const F32 bps); - void setOutBandwidth(const F32 bps); - S32 receivePacket (S32 socket, char *datap); - S32 receiveFromRing (S32 socket, char *datap); - bool sendPacket(int h_socket, char * send_buffer, S32 buf_size, LLHost host); + inline LLHost getLastSender() const; + inline LLHost getLastReceivingInterface() const; - inline LLHost getLastSender(); - inline LLHost getLastReceivingInterface(); + S32 getActualInBytes() const { return mActualBytesIn; } + S32 getActualOutBytes() const { return mActualBytesOut; } + S32 getAndResetActualInBits() { S32 bits = mActualBytesIn * 8; mActualBytesIn = 0; return bits;} + S32 getAndResetActualOutBits() { S32 bits = mActualBytesOut * 8; mActualBytesOut = 0; return bits;} - S32 getAndResetActualInBits() { S32 bits = mActualBitsIn; mActualBitsIn = 0; return bits;} - S32 getAndResetActualOutBits() { S32 bits = mActualBitsOut; mActualBitsOut = 0; return bits;} + S32 getNumBufferedPackets() const { return (S32)(mNumBufferedPackets); } + S32 getNumBufferedBytes() const { return mNumBufferedBytes; } protected: - bool mUseInThrottle; - bool mUseOutThrottle; + // returns 'true' if we should intentionally drop a packet + bool computeDrop(); - // For simulating a lower-bandwidth connection - BPS - LLThrottle mInThrottle; - LLThrottle mOutThrottle; + // returns packet_size of received packet, zero or less if no packet found + S32 receiveOrDropPacket(S32 socket, char *datap, bool drop); + S32 receiveOrDropBufferedPacket(char *datap, bool drop); - S32 mActualBitsIn; - S32 mActualBitsOut; - S32 mMaxBufferLength; // How much data can we queue up before dropping data. - S32 mInBufferLength; // Current incoming buffer length - S32 mOutBufferLength; // Current outgoing buffer length + // returns packet_size of packet buffered + S32 bufferInboundPacket(S32 socket); - F32 mDropPercentage; // % of packets to drop - U32 mPacketsToDrop; // drop next n packets + // returns 'true' if ring was expanded + bool expandRing(); - std::queue mReceiveQueue; - std::queue mSendQueue; +protected: + std::vector mPacketRing; + S16 mHeadIndex { 0 }; + S16 mNumBufferedPackets { 0 }; + S32 mNumBufferedBytes { 0 }; + + S32 mActualBytesIn { 0 }; + S32 mActualBytesOut { 0 }; + F32 mDropPercentage { 0.0f }; // % of inbound packets to drop + U32 mPacketsToDrop { 0 }; // drop next inbound n packets + // These are the sender and receiving_interface for the last packet delivered by receivePacket() LLHost mLastSender; LLHost mLastReceivingIF; - -private: - bool sendPacketImpl(int h_socket, const char * send_buffer, S32 buf_size, LLHost host); }; -inline LLHost LLPacketRing::getLastSender() +inline LLHost LLPacketRing::getLastSender() const { return mLastSender; } -inline LLHost LLPacketRing::getLastReceivingInterface() +inline LLHost LLPacketRing::getLastReceivingInterface() const { return mLastReceivingIF; } - -#endif diff --git a/indra/llmessage/message.cpp b/indra/llmessage/message.cpp index cfa5178fc6..c130b7a6db 100644 --- a/indra/llmessage/message.cpp +++ b/indra/llmessage/message.cpp @@ -656,8 +656,7 @@ bool LLMessageSystem::checkMessages(LockMessageChecker&, S64 frame_count ) // UseCircuitCode is allowed in even from an invalid circuit, so that // we can toss circuits around. - if( - valid_packet && + else if ( !cdp && (mTemplateMessageReader->getMessageName() != _PREHASH_UseCircuitCode)) @@ -667,8 +666,7 @@ bool LLMessageSystem::checkMessages(LockMessageChecker&, S64 frame_count ) valid_packet = false; } - if( - valid_packet && + if ( valid_packet && cdp && !cdp->getTrusted() && mTemplateMessageReader->isTrusted()) @@ -680,7 +678,7 @@ bool LLMessageSystem::checkMessages(LockMessageChecker&, S64 frame_count ) valid_packet = false; } - if( valid_packet ) + if ( valid_packet ) { logValidMsg(cdp, host, recv_reliable, recv_resent, acks>0 ); valid_packet = mTemplateMessageReader->readMessage(buffer, host); @@ -821,6 +819,11 @@ void LLMessageSystem::processAcks(LockMessageChecker&, F32 collect_time) } } +S32 LLMessageSystem::drainUdpSocket() +{ + return mPacketRing.drainSocket(mSocket); +} + void LLMessageSystem::copyMessageReceivedToSend() { // NOTE: babbage: switch builder to match reader to avoid diff --git a/indra/llmessage/message.h b/indra/llmessage/message.h index b4b0d94021..1844d5e7cd 100644 --- a/indra/llmessage/message.h +++ b/indra/llmessage/message.h @@ -417,6 +417,9 @@ public: bool checkMessages(LockMessageChecker&, S64 frame_count = 0 ); void processAcks(LockMessageChecker&, F32 collect_time = 0.f); + // returns total number of buffered packets after the drain + S32 drainUdpSocket(); + bool isMessageFast(const char *msg); bool isMessage(const char *msg) { diff --git a/indra/llmessage/net.cpp b/indra/llmessage/net.cpp index f153c938cf..2be5a9e5b6 100644 --- a/indra/llmessage/net.cpp +++ b/indra/llmessage/net.cpp @@ -76,14 +76,8 @@ static U32 gsnReceivingIFAddr = INVALID_HOST_IP_ADDRESS; // Address to which dat const char* LOOPBACK_ADDRESS_STRING = "127.0.0.1"; const char* BROADCAST_ADDRESS_STRING = "255.255.255.255"; -#if LL_DARWIN - // macOS returns an error when trying to set these to 400000. Smaller values succeed. - const int SEND_BUFFER_SIZE = 200000; - const int RECEIVE_BUFFER_SIZE = 200000; -#else // LL_DARWIN - const int SEND_BUFFER_SIZE = 400000; - const int RECEIVE_BUFFER_SIZE = 400000; -#endif // LL_DARWIN +const int SEND_BUFFER_SIZE = 200000; +const int RECEIVE_BUFFER_SIZE = 800000; // universal functions (cross-platform) -- cgit v1.2.3 From ecac92c60c1ca92aae0bb940c8a7952146c99df0 Mon Sep 17 00:00:00 2001 From: Andrey Kleshchev Date: Thu, 27 Feb 2025 16:39:34 +0200 Subject: #3627 Warn user about low memory on bad_alloc --- indra/llmessage/llcoproceduremanager.cpp | 7 ++++++- indra/llmessage/llcorehttputil.cpp | 10 +++++++++- 2 files changed, 15 insertions(+), 2 deletions(-) (limited to 'indra/llmessage') diff --git a/indra/llmessage/llcoproceduremanager.cpp b/indra/llmessage/llcoproceduremanager.cpp index 263670bdac..6a663a8e97 100644 --- a/indra/llmessage/llcoproceduremanager.cpp +++ b/indra/llmessage/llcoproceduremanager.cpp @@ -301,12 +301,12 @@ LLCoprocedurePool::LLCoprocedurePool(const std::string &poolName, size_t size): mPoolSize(size), mActiveCoprocsCount(0), mPending(0), - mPendingCoprocs(std::make_shared(LLCoprocedureManager::DEFAULT_QUEUE_SIZE)), mHTTPPolicy(LLCore::HttpRequest::DEFAULT_POLICY_ID), mCoroMapping() { try { + mPendingCoprocs = std::make_shared(LLCoprocedureManager::DEFAULT_QUEUE_SIZE); // store in our LLTempBoundListener so that when the LLCoprocedurePool is // destroyed, we implicitly disconnect from this LLEventPump // Monitores application status @@ -339,6 +339,11 @@ LLCoprocedurePool::LLCoprocedurePool(const std::string &poolName, size_t size): llassert(0); // Fix Me! Ignoring missing listener! } + catch (std::bad_alloc&) + { + LLError::LLUserWarningMsg::showOutOfMemory(); + LL_ERRS("CoProcMgr") << "Bad memory allocation in LLCoprocedurePool::LLCoprocedurePool!" << LL_ENDL; + } for (size_t count = 0; count < mPoolSize; ++count) { diff --git a/indra/llmessage/llcorehttputil.cpp b/indra/llmessage/llcorehttputil.cpp index 918a69be6f..992e145758 100644 --- a/indra/llmessage/llcorehttputil.cpp +++ b/indra/llmessage/llcorehttputil.cpp @@ -295,7 +295,15 @@ void HttpCoroHandler::onCompleted(LLCore::HttpHandle handle, LLCore::HttpRespons } else { - result = this->handleSuccess(response, status); + try + { + result = this->handleSuccess(response, status); + } + catch (std::bad_alloc&) + { + LLError::LLUserWarningMsg::showOutOfMemory(); + LL_ERRS("CoreHTTP") << "Failed to allocate memory for response handling." << LL_ENDL; + } } buildStatusEntry(response, status, result); -- cgit v1.2.3 From 3efe5b493442f2b90ffbb571d8fa24c8ab17bf5e Mon Sep 17 00:00:00 2001 From: Andrey Kleshchev Date: Tue, 4 Mar 2025 19:55:55 +0200 Subject: #3644 Fix new logging of packet drops hitting performance --- indra/llmessage/llpacketring.cpp | 16 +++++++++++++++- indra/llmessage/llpacketring.h | 5 +++++ indra/llmessage/message.cpp | 1 + 3 files changed, 21 insertions(+), 1 deletion(-) (limited to 'indra/llmessage') diff --git a/indra/llmessage/llpacketring.cpp b/indra/llmessage/llpacketring.cpp index ae5a2168db..470398152c 100644 --- a/indra/llmessage/llpacketring.cpp +++ b/indra/llmessage/llpacketring.cpp @@ -304,7 +304,7 @@ S32 LLPacketRing::drainSocket(S32 socket) S32 num_dropped_packets = (num_loops - 1 + old_num_packets) - mNumBufferedPackets; if (num_dropped_packets > 0) { - LL_WARNS("Messaging") << "dropped " << num_dropped_packets << " UDP packets" << LL_ENDL; + mNumDroppedPackets += num_dropped_packets; } return (S32)(mNumBufferedPackets); } @@ -341,3 +341,17 @@ bool LLPacketRing::expandRing() mHeadIndex = mNumBufferedPackets; return true; } + +void LLPacketRing::dumpPacketRingStats() +{ + mNumDroppedPacketsTotal += mNumDroppedPackets; + LL_INFOS("Messaging") << "Packet ring stats: " << std::endl + << "Buffered packets: " << mNumBufferedPackets << std::endl + << "Buffered bytes: " << mNumBufferedBytes << std::endl + << "Dropped packets current: " << mNumDroppedPackets << std::endl + << "Dropped packets total: " << mNumDroppedPacketsTotal << std::endl + << "Dropped packets percentage: " << mDropPercentage << "%" << std::endl + << "Actual in bytes: " << mActualBytesIn << std::endl + << "Actual out bytes: " << mActualBytesOut << LL_ENDL; + mNumDroppedPackets = 0; +} diff --git a/indra/llmessage/llpacketring.h b/indra/llmessage/llpacketring.h index 0dff2c63b1..237efc12e0 100644 --- a/indra/llmessage/llpacketring.h +++ b/indra/llmessage/llpacketring.h @@ -62,6 +62,9 @@ public: S32 getNumBufferedPackets() const { return (S32)(mNumBufferedPackets); } S32 getNumBufferedBytes() const { return mNumBufferedBytes; } + S32 getNumDroppedPackets() const { return mNumDroppedPacketsTotal + mNumDroppedPackets; } + + void dumpPacketRingStats(); protected: // returns 'true' if we should intentionally drop a packet bool computeDrop(); @@ -80,6 +83,8 @@ protected: std::vector mPacketRing; S16 mHeadIndex { 0 }; S16 mNumBufferedPackets { 0 }; + S32 mNumDroppedPackets { 0 }; + S32 mNumDroppedPacketsTotal { 0 }; S32 mNumBufferedBytes { 0 }; S32 mActualBytesIn { 0 }; diff --git a/indra/llmessage/message.cpp b/indra/llmessage/message.cpp index c130b7a6db..ad1ff86807 100644 --- a/indra/llmessage/message.cpp +++ b/indra/llmessage/message.cpp @@ -724,6 +724,7 @@ bool LLMessageSystem::checkMessages(LockMessageChecker&, S64 frame_count ) // Check to see if we need to print debug info if ((mt_sec - mCircuitPrintTime) > mCircuitPrintFreq) { + mPacketRing.dumpPacketRingStats(); dumpCircuitInfo(); mCircuitPrintTime = mt_sec; } -- cgit v1.2.3 From 5c2a331c44424722f069cd7a6b167f24de1afb83 Mon Sep 17 00:00:00 2001 From: Andrey Kleshchev Date: Wed, 5 Mar 2025 10:53:17 +0200 Subject: #3644 Fix new logging of packet drops hitting performance #2 Misunderstood how mDroppedPackets worked, clear out what's not needed --- indra/llmessage/llpacketring.cpp | 2 ++ 1 file changed, 2 insertions(+) (limited to 'indra/llmessage') diff --git a/indra/llmessage/llpacketring.cpp b/indra/llmessage/llpacketring.cpp index 470398152c..da3c502e9d 100644 --- a/indra/llmessage/llpacketring.cpp +++ b/indra/llmessage/llpacketring.cpp @@ -304,6 +304,8 @@ S32 LLPacketRing::drainSocket(S32 socket) S32 num_dropped_packets = (num_loops - 1 + old_num_packets) - mNumBufferedPackets; if (num_dropped_packets > 0) { + // It will eventually be accounted by mDroppedPackets + // and mPacketsLost, but track it here for logging purposes. mNumDroppedPackets += num_dropped_packets; } return (S32)(mNumBufferedPackets); -- cgit v1.2.3 From 483b4fa993b0bad9edeef1e74fb32009bb49f994 Mon Sep 17 00:00:00 2001 From: Bennett Goble Date: Fri, 29 Mar 2024 14:58:33 -0700 Subject: BUG-134040: Fix broken SOCKS5 proxy Second Life's SOCKS5 proxy has been broken on windows for at least six years due to a conflation of milliseconds and microseconds in the APR timeout value used when attempting to ping the proxy. # Conflicts: # indra/llmessage/llproxy.cpp --- indra/llmessage/llproxy.cpp | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) (limited to 'indra/llmessage') diff --git a/indra/llmessage/llproxy.cpp b/indra/llmessage/llproxy.cpp index 864e68998c..d04ca52ad6 100644 --- a/indra/llmessage/llproxy.cpp +++ b/indra/llmessage/llproxy.cpp @@ -465,7 +465,7 @@ void LLProxy::applyProxySettings(CURL* handle) /** * @brief Send one TCP packet and receive one in return. * - * This operation is done synchronously with a 1000ms timeout. Therefore, it should not be used when a blocking + * This operation is done synchronously with a 100ms timeout. Therefore, it should not be used when a blocking * operation would impact the operation of the viewer. * * @param handle_ptr Pointer to a connected LLSocket of type STREAM_TCP. @@ -482,7 +482,7 @@ static apr_status_t tcp_blocking_handshake(LLSocket::ptr_t handle, char * dataou apr_size_t expected_len = outlen; - handle->setBlocking(1000); + handle->setBlocking(100000); // 100ms, 100000us. Should be sufficient for localhost, nearby network rv = apr_socket_send(apr_socket, dataout, &outlen); if (APR_SUCCESS != rv) -- cgit v1.2.3 From 4e5dce794923736ff7c32d55f1e75361e89a9d31 Mon Sep 17 00:00:00 2001 From: Andrey Kleshchev Date: Tue, 18 Mar 2025 17:04:08 +0200 Subject: #3736 LLExperienceCache shutdown crash --- indra/llmessage/llexperiencecache.cpp | 2 ++ 1 file changed, 2 insertions(+) (limited to 'indra/llmessage') diff --git a/indra/llmessage/llexperiencecache.cpp b/indra/llmessage/llexperiencecache.cpp index b5d0c93376..83a070df32 100644 --- a/indra/llmessage/llexperiencecache.cpp +++ b/indra/llmessage/llexperiencecache.cpp @@ -94,6 +94,8 @@ LLExperienceCache::LLExperienceCache() LLExperienceCache::~LLExperienceCache() { + // can exit without cleanup() + sShutdown = true; } void LLExperienceCache::initSingleton() -- cgit v1.2.3 From c99e3167ed8549bc13d7df03b1e12dc15b0a080f Mon Sep 17 00:00:00 2001 From: Andrey Kleshchev Date: Thu, 13 Mar 2025 23:55:38 +0200 Subject: #3644 Adjust throttle based of how busy buffer is --- indra/llmessage/llpacketring.cpp | 6 ++++++ indra/llmessage/llpacketring.h | 1 + indra/llmessage/message.h | 4 +--- 3 files changed, 8 insertions(+), 3 deletions(-) (limited to 'indra/llmessage') diff --git a/indra/llmessage/llpacketring.cpp b/indra/llmessage/llpacketring.cpp index da3c502e9d..eb6650c6c5 100644 --- a/indra/llmessage/llpacketring.cpp +++ b/indra/llmessage/llpacketring.cpp @@ -344,6 +344,12 @@ bool LLPacketRing::expandRing() return true; } +F32 LLPacketRing::getBufferLoadRate() const +{ + // goes up to MAX_BUFFER_RING_SIZE + return (F32)mNumBufferedPackets / (F32)DEFAULT_BUFFER_RING_SIZE; +} + void LLPacketRing::dumpPacketRingStats() { mNumDroppedPacketsTotal += mNumDroppedPackets; diff --git a/indra/llmessage/llpacketring.h b/indra/llmessage/llpacketring.h index 237efc12e0..572dcbd271 100644 --- a/indra/llmessage/llpacketring.h +++ b/indra/llmessage/llpacketring.h @@ -64,6 +64,7 @@ public: S32 getNumBufferedBytes() const { return mNumBufferedBytes; } S32 getNumDroppedPackets() const { return mNumDroppedPacketsTotal + mNumDroppedPackets; } + F32 getBufferLoadRate() const; // from 0 to 4 (0 - empty, 1 - default size is full) void dumpPacketRingStats(); protected: // returns 'true' if we should intentionally drop a packet diff --git a/indra/llmessage/message.h b/indra/llmessage/message.h index 1844d5e7cd..30945cac51 100644 --- a/indra/llmessage/message.h +++ b/indra/llmessage/message.h @@ -538,7 +538,6 @@ public: //void buildMessage(); - S32 zeroCode(U8 **data, S32 *data_size); S32 zeroCodeExpand(U8 **data, S32 *data_size); S32 zeroCodeAdjustCurrentSendTotal(); @@ -755,6 +754,7 @@ public: S32 getReceiveBytes() const; S32 getUnackedListSize() const { return mUnackedListSize; } + F32 getBufferLoadRate() const { return mPacketRing.getBufferLoadRate(); } //const char* getCurrentSMessageName() const { return mCurrentSMessageName; } //const char* getCurrentSBlockName() const { return mCurrentSBlockName; } @@ -842,12 +842,10 @@ private: LLUUID mSessionID; void addTemplate(LLMessageTemplate *templatep); - bool decodeTemplate( const U8* buffer, S32 buffer_size, LLMessageTemplate** msg_template ); void logMsgFromInvalidCircuit( const LLHost& sender, bool recv_reliable ); void logTrustedMsgFromUntrustedCircuit( const LLHost& sender ); void logValidMsg(LLCircuitData *cdp, const LLHost& sender, bool recv_reliable, bool recv_resent, bool recv_acks ); - void logRanOffEndOfPacket( const LLHost& sender ); class LLMessageCountInfo { -- cgit v1.2.3 From 7fc9c0baa430875dce44778d5d72b302858f9f4e Mon Sep 17 00:00:00 2001 From: Andrey Kleshchev Date: Mon, 24 Mar 2025 22:20:53 +0200 Subject: #3796 Crash at assetRequestCoro Coroutine doesn't own req pointer, don't use it --- indra/llmessage/llassetstorage.cpp | 22 +++++++++------------- indra/llmessage/llassetstorage.h | 3 ++- 2 files changed, 11 insertions(+), 14 deletions(-) (limited to 'indra/llmessage') diff --git a/indra/llmessage/llassetstorage.cpp b/indra/llmessage/llassetstorage.cpp index 2de59c1b6a..10fd56a68e 100644 --- a/indra/llmessage/llassetstorage.cpp +++ b/indra/llmessage/llassetstorage.cpp @@ -585,7 +585,8 @@ void LLAssetStorage::getAssetData(const LLUUID uuid, // static void LLAssetStorage::removeAndCallbackPendingDownloads(const LLUUID& file_id, LLAssetType::EType file_type, const LLUUID& callback_id, LLAssetType::EType callback_type, - S32 result_code, LLExtStat ext_status) + S32 result_code, LLExtStat ext_status, + S32 bytes_fetched) { // find and callback ALL pending requests for this UUID // SJB: We process the callbacks in reverse order, I do not know if this is important, @@ -598,6 +599,10 @@ void LLAssetStorage::removeAndCallbackPendingDownloads(const LLUUID& file_id, LL LLAssetRequest* tmp = *curiter; if ((tmp->getUUID() == file_id) && (tmp->getType()== file_type)) { + if (bytes_fetched > 0) + { + tmp->mBytesFetched = bytes_fetched; + } requests.push_front(tmp); iter = gAssetStorage->mPendingDownloads.erase(curiter); } @@ -664,6 +669,7 @@ void LLAssetStorage::downloadCompleteCallback( callback_type = req->getType(); } + S32 bytes_fetched = 0; if (LL_ERR_NOERR == result) { // we might have gotten a zero-size file @@ -677,21 +683,11 @@ void LLAssetStorage::downloadCompleteCallback( } else { -#if 1 - for (request_list_t::iterator iter = gAssetStorage->mPendingDownloads.begin(); - iter != gAssetStorage->mPendingDownloads.end(); ++iter ) - { - LLAssetRequest* dlreq = *iter; - if ((dlreq->getUUID() == file_id) && (dlreq->getType()== file_type)) - { - dlreq->mBytesFetched = vfile.getSize(); - } - } -#endif + bytes_fetched = vfile.getSize(); } } - removeAndCallbackPendingDownloads(file_id, file_type, callback_id, callback_type, result, ext_status); + removeAndCallbackPendingDownloads(file_id, file_type, callback_id, callback_type, result, ext_status, bytes_fetched); } void LLAssetStorage::getEstateAsset( diff --git a/indra/llmessage/llassetstorage.h b/indra/llmessage/llassetstorage.h index 88fa572092..6d6526757d 100644 --- a/indra/llmessage/llassetstorage.h +++ b/indra/llmessage/llassetstorage.h @@ -324,7 +324,8 @@ public: static void removeAndCallbackPendingDownloads(const LLUUID& file_id, LLAssetType::EType file_type, const LLUUID& callback_id, LLAssetType::EType callback_type, - S32 result_code, LLExtStat ext_status); + S32 result_code, LLExtStat ext_status, + S32 bytes_fetched); // download process callbacks static void downloadCompleteCallback( -- cgit v1.2.3