diff options
Diffstat (limited to 'indra/llmessage/llpacketring.cpp')
-rw-r--r-- | indra/llmessage/llpacketring.cpp | 462 |
1 files changed, 228 insertions, 234 deletions
diff --git a/indra/llmessage/llpacketring.cpp b/indra/llmessage/llpacketring.cpp index be838770a8..eb6650c6c5 100644 --- a/indra/llmessage/llpacketring.cpp +++ b/indra/llmessage/llpacketring.cpp @@ -1,6 +1,6 @@ /** * @file llpacketring.cpp - * @brief implementation of LLPacketRing class for a packet. + * @brief implementation of LLPacketRing class. * * $LicenseInfo:firstyear=2001&license=viewerlgpl$ * Second Life Viewer Source Code @@ -43,329 +43,323 @@ #include "message.h" #include "u64.h" -/////////////////////////////////////////////////////////// -LLPacketRing::LLPacketRing () : - mUseInThrottle(false), - mUseOutThrottle(false), - mInThrottle(256000.f), - mOutThrottle(64000.f), - mActualBitsIn(0), - mActualBitsOut(0), - mMaxBufferLength(64000), - mInBufferLength(0), - mOutBufferLength(0), - mDropPercentage(0.0f), - mPacketsToDrop(0x0) +constexpr S16 MAX_BUFFER_RING_SIZE = 1024; +constexpr S16 DEFAULT_BUFFER_RING_SIZE = 256; + +LLPacketRing::LLPacketRing () + : mPacketRing(DEFAULT_BUFFER_RING_SIZE, nullptr) { + LLHost invalid_host; + for (size_t i = 0; i < mPacketRing.size(); ++i) + { + mPacketRing[i] = new LLPacketBuffer(invalid_host, nullptr, 0); + } } -/////////////////////////////////////////////////////////// LLPacketRing::~LLPacketRing () { - cleanup(); + for (auto packet : mPacketRing) + { + delete packet; + } + mPacketRing.clear(); + mNumBufferedPackets = 0; + mNumBufferedBytes = 0; + mHeadIndex = 0; } -/////////////////////////////////////////////////////////// -void LLPacketRing::cleanup () +S32 LLPacketRing::receivePacket (S32 socket, char *datap) { - LLPacketBuffer *packetp; + bool drop = computeDrop(); + return (mNumBufferedPackets > 0) ? + receiveOrDropBufferedPacket(datap, drop) : + receiveOrDropPacket(socket, datap, drop); +} - while (!mReceiveQueue.empty()) +bool send_packet_helper(int socket, const char * datap, S32 data_size, LLHost host) +{ + if (!LLProxy::isSOCKSProxyEnabled()) { - packetp = mReceiveQueue.front(); - delete packetp; - mReceiveQueue.pop(); + return send_packet(socket, datap, data_size, host.getAddress(), host.getPort()); } - while (!mSendQueue.empty()) - { - packetp = mSendQueue.front(); - delete packetp; - mSendQueue.pop(); - } -} + char headered_send_buffer[NET_BUFFER_SIZE + SOCKS_HEADER_SIZE]; -/////////////////////////////////////////////////////////// -void LLPacketRing::dropPackets (U32 num_to_drop) -{ - mPacketsToDrop += num_to_drop; -} + proxywrap_t *socks_header = static_cast<proxywrap_t*>(static_cast<void*>(&headered_send_buffer)); + socks_header->rsv = 0; + socks_header->addr = host.getAddress(); + socks_header->port = htons(host.getPort()); + socks_header->atype = ADDRESS_IPV4; + socks_header->frag = 0; -/////////////////////////////////////////////////////////// -void LLPacketRing::setDropPercentage (F32 percent_to_drop) -{ - mDropPercentage = percent_to_drop; -} + memcpy(headered_send_buffer + SOCKS_HEADER_SIZE, datap, data_size); -void LLPacketRing::setUseInThrottle(const bool use_throttle) -{ - mUseInThrottle = use_throttle; + return send_packet( socket, + headered_send_buffer, + data_size + SOCKS_HEADER_SIZE, + LLProxy::getInstance()->getUDPProxy().getAddress(), + LLProxy::getInstance()->getUDPProxy().getPort()); } -void LLPacketRing::setUseOutThrottle(const bool use_throttle) +bool LLPacketRing::sendPacket(int socket, const char * datap, S32 data_size, LLHost host) { - mUseOutThrottle = use_throttle; + mActualBytesOut += data_size; + return send_packet_helper(socket, datap, data_size, host); } -void LLPacketRing::setInBandwidth(const F32 bps) +void LLPacketRing::dropPackets (U32 num_to_drop) { - mInThrottle.setRate(bps); + mPacketsToDrop += num_to_drop; } -void LLPacketRing::setOutBandwidth(const F32 bps) +void LLPacketRing::setDropPercentage (F32 percent_to_drop) { - mOutThrottle.setRate(bps); + mDropPercentage = percent_to_drop; } -/////////////////////////////////////////////////////////// -S32 LLPacketRing::receiveFromRing (S32 socket, char *datap) -{ - if (mInThrottle.checkOverflow(0)) +bool LLPacketRing::computeDrop() +{ + bool drop= (mDropPercentage > 0.0f && (ll_frand(100.f) < mDropPercentage)); + if (drop) { - // We don't have enough bandwidth, don't give them a packet. - return 0; + ++mPacketsToDrop; } - - LLPacketBuffer *packetp = NULL; - if (mReceiveQueue.empty()) + if (mPacketsToDrop > 0) { - // No packets on the queue, don't give them any. - return 0; + --mPacketsToDrop; + drop = true; } - - S32 packet_size = 0; - packetp = mReceiveQueue.front(); - mReceiveQueue.pop(); - packet_size = packetp->getSize(); - if (packetp->getData() != NULL) - { - memcpy(datap, packetp->getData(), packet_size); /*Flawfinder: ignore*/ - } - // need to set sender IP/port!! - mLastSender = packetp->getHost(); - mLastReceivingIF = packetp->getReceivingInterface(); - delete packetp; - - this->mInBufferLength -= packet_size; - - // Adjust the throttle - mInThrottle.throttleOverflow(packet_size * 8.f); - return packet_size; + return drop; } -/////////////////////////////////////////////////////////// -S32 LLPacketRing::receivePacket (S32 socket, char *datap) +S32 LLPacketRing::receiveOrDropPacket(S32 socket, char *datap, bool drop) { S32 packet_size = 0; - // If using the throttle, simulate a limited size input buffer. - if (mUseInThrottle) + // pull straight from socket + if (LLProxy::isSOCKSProxyEnabled()) { - bool done = false; - - // push any current net packet (if any) onto delay ring - while (!done) + char buffer[NET_BUFFER_SIZE + SOCKS_HEADER_SIZE]; /* Flawfinder ignore */ + packet_size = receive_packet(socket, buffer); + if (packet_size > 0) { - LLPacketBuffer *packetp; - packetp = new LLPacketBuffer(socket); - - if (packetp->getSize()) - { - mActualBitsIn += packetp->getSize() * 8; - - // Fake packet loss - if (mDropPercentage && (ll_frand(100.f) < mDropPercentage)) - { - mPacketsToDrop++; - } - - if (mPacketsToDrop) - { - delete packetp; - packetp = NULL; - packet_size = 0; - mPacketsToDrop--; - } - } + mActualBytesIn += packet_size; + } - // If we faked packet loss, then we don't have a packet - // to use for buffer overflow testing - if (packetp) + if (packet_size > SOCKS_HEADER_SIZE) + { + if (drop) { - if (mInBufferLength + packetp->getSize() > mMaxBufferLength) - { - // Toss it. - LL_WARNS() << "Throwing away packet, overflowing buffer" << LL_ENDL; - delete packetp; - packetp = NULL; - } - else if (packetp->getSize()) - { - mReceiveQueue.push(packetp); - mInBufferLength += packetp->getSize(); - } - else - { - delete packetp; - packetp = NULL; - done = true; - } + packet_size = 0; } else { - // No packetp, keep going? - no packetp == faked packet loss - } - } - - // Now, grab data off of the receive queue according to our - // throttled bandwidth settings. - packet_size = receiveFromRing(socket, datap); - } - else - { - // no delay, pull straight from net - if (LLProxy::isSOCKSProxyEnabled()) - { - U8 buffer[NET_BUFFER_SIZE + SOCKS_HEADER_SIZE]; - packet_size = receive_packet(socket, static_cast<char*>(static_cast<void*>(buffer))); - - if (packet_size > SOCKS_HEADER_SIZE) - { // *FIX We are assuming ATYP is 0x01 (IPv4), not 0x03 (hostname) or 0x04 (IPv6) - memcpy(datap, buffer + SOCKS_HEADER_SIZE, packet_size - SOCKS_HEADER_SIZE); + packet_size -= SOCKS_HEADER_SIZE; // The unwrapped packet size + memcpy(datap, buffer + SOCKS_HEADER_SIZE, packet_size); proxywrap_t * header = static_cast<proxywrap_t*>(static_cast<void*>(buffer)); mLastSender.setAddress(header->addr); mLastSender.setPort(ntohs(header->port)); - - packet_size -= SOCKS_HEADER_SIZE; // The unwrapped packet size - } - else - { - packet_size = 0; + mLastReceivingIF = ::get_receiving_interface(); } } else { - packet_size = receive_packet(socket, datap); - mLastSender = ::get_sender(); + packet_size = 0; } - - mLastReceivingIF = ::get_receiving_interface(); - - if (packet_size) // did we actually get a packet? + } + else + { + packet_size = receive_packet(socket, datap); + if (packet_size > 0) { - if (mDropPercentage && (ll_frand(100.f) < mDropPercentage)) + mActualBytesIn += packet_size; + if (drop) { - mPacketsToDrop++; + packet_size = 0; } - - if (mPacketsToDrop) + else { - packet_size = 0; - mPacketsToDrop--; + mLastSender = ::get_sender(); + mLastReceivingIF = ::get_receiving_interface(); } } } - return packet_size; } -bool LLPacketRing::sendPacket(int h_socket, char * send_buffer, S32 buf_size, LLHost host) +S32 LLPacketRing::receiveOrDropBufferedPacket(char *datap, bool drop) { - bool status = true; - if (!mUseOutThrottle) + assert(mNumBufferedPackets > 0); + S32 packet_size = 0; + + S16 ring_size = (S16)(mPacketRing.size()); + S16 packet_index = (mHeadIndex + ring_size - mNumBufferedPackets) % ring_size; + LLPacketBuffer* packet = mPacketRing[packet_index]; + packet_size = packet->getSize(); + mLastSender = packet->getHost(); + mLastReceivingIF = packet->getReceivingInterface(); + + --mNumBufferedPackets; + mNumBufferedBytes -= packet_size; + if (mNumBufferedPackets == 0) { - return sendPacketImpl(h_socket, send_buffer, buf_size, host ); + assert(mNumBufferedBytes == 0); + } + + if (!drop) + { + assert(packet_size > 0); + memcpy(datap, packet->getData(), packet_size); } else { - mActualBitsOut += buf_size * 8; - LLPacketBuffer *packetp = NULL; - // See if we've got enough throttle to send a packet. - while (!mOutThrottle.checkOverflow(0.f)) - { - // While we have enough bandwidth, send a packet from the queue or the current packet + packet_size = 0; + } + return packet_size; +} + +S32 LLPacketRing::bufferInboundPacket(S32 socket) +{ + if (mNumBufferedPackets == mPacketRing.size() && mNumBufferedPackets < MAX_BUFFER_RING_SIZE) + { + expandRing(); + } - S32 packet_size = 0; - if (!mSendQueue.empty()) + LLPacketBuffer* packet = mPacketRing[mHeadIndex]; + S32 old_packet_size = packet->getSize(); + S32 packet_size = 0; + if (LLProxy::isSOCKSProxyEnabled()) + { + char buffer[NET_BUFFER_SIZE + SOCKS_HEADER_SIZE]; /* Flawfinder ignore */ + packet_size = receive_packet(socket, buffer); + if (packet_size > 0) + { + mActualBytesIn += packet_size; + if (packet_size > SOCKS_HEADER_SIZE) { - // Send a packet off of the queue - LLPacketBuffer *packetp = mSendQueue.front(); - mSendQueue.pop(); + // *FIX We are assuming ATYP is 0x01 (IPv4), not 0x03 (hostname) or 0x04 (IPv6) - mOutBufferLength -= packetp->getSize(); - packet_size = packetp->getSize(); + proxywrap_t * header = static_cast<proxywrap_t*>(static_cast<void*>(buffer)); + LLHost sender; + sender.setAddress(header->addr); + sender.setPort(ntohs(header->port)); - status = sendPacketImpl(h_socket, packetp->getData(), packet_size, packetp->getHost()); + packet_size -= SOCKS_HEADER_SIZE; // The unwrapped packet size + packet->init(buffer + SOCKS_HEADER_SIZE, packet_size, sender); - delete packetp; - // Update the throttle - mOutThrottle.throttleOverflow(packet_size * 8.f); + mHeadIndex = (mHeadIndex + 1) % (S16)(mPacketRing.size()); + if (mNumBufferedPackets < MAX_BUFFER_RING_SIZE) + { + ++mNumBufferedPackets; + mNumBufferedBytes += packet_size; + } + else + { + // we overwrote an older packet + mNumBufferedBytes += packet_size - old_packet_size; + } } else { - // If the queue's empty, we can just send this packet right away. - status = sendPacketImpl(h_socket, send_buffer, buf_size, host ); - packet_size = buf_size; - - // Update the throttle - mOutThrottle.throttleOverflow(packet_size * 8.f); - - // This was the packet we're sending now, there are no other packets - // that we need to send - return status; + packet_size = 0; } - - } - - // We haven't sent the incoming packet, add it to the queue - if (mOutBufferLength + buf_size > mMaxBufferLength) - { - // Nuke this packet, we overflowed the buffer. - // Toss it. - LL_WARNS() << "Throwing away outbound packet, overflowing buffer" << LL_ENDL; } - else + } + else + { + packet->init(socket); + packet_size = packet->getSize(); + if (packet_size > 0) { - static LLTimer queue_timer; - if ((mOutBufferLength > 4192) && queue_timer.getElapsedTimeF32() > 1.f) + mActualBytesIn += packet_size; + + mHeadIndex = (mHeadIndex + 1) % (S16)(mPacketRing.size()); + if (mNumBufferedPackets < MAX_BUFFER_RING_SIZE) { - // Add it to the queue - LL_INFOS() << "Outbound packet queue " << mOutBufferLength << " bytes" << LL_ENDL; - queue_timer.reset(); + ++mNumBufferedPackets; + mNumBufferedBytes += packet_size; + } + else + { + // we overwrote an older packet + mNumBufferedBytes += packet_size - old_packet_size; } - packetp = new LLPacketBuffer(host, send_buffer, buf_size); - - mOutBufferLength += packetp->getSize(); - mSendQueue.push(packetp); } } + return packet_size; +} - return status; +S32 LLPacketRing::drainSocket(S32 socket) +{ + // drain into buffer + S32 packet_size = 1; + S32 num_loops = 0; + S32 old_num_packets = mNumBufferedPackets; + while (packet_size > 0) + { + packet_size = bufferInboundPacket(socket); + ++num_loops; + } + S32 num_dropped_packets = (num_loops - 1 + old_num_packets) - mNumBufferedPackets; + if (num_dropped_packets > 0) + { + // It will eventually be accounted by mDroppedPackets + // and mPacketsLost, but track it here for logging purposes. + mNumDroppedPackets += num_dropped_packets; + } + return (S32)(mNumBufferedPackets); } -bool LLPacketRing::sendPacketImpl(int h_socket, const char * send_buffer, S32 buf_size, LLHost host) +bool LLPacketRing::expandRing() { + // compute larger size + constexpr S16 BUFFER_RING_EXPANSION = 256; + S16 old_size = (S16)(mPacketRing.size()); + S16 new_size = llmin(old_size + BUFFER_RING_EXPANSION, MAX_BUFFER_RING_SIZE); + if (new_size == old_size) + { + // mPacketRing is already maxed out + return false; + } - if (!LLProxy::isSOCKSProxyEnabled()) + // make a larger ring and copy packet pointers + std::vector<LLPacketBuffer*> new_ring(new_size, nullptr); + for (S16 i = 0; i < old_size; ++i) { - return send_packet(h_socket, send_buffer, buf_size, host.getAddress(), host.getPort()); + S16 j = (mHeadIndex + i) % old_size; + new_ring[i] = mPacketRing[j]; } - char headered_send_buffer[NET_BUFFER_SIZE + SOCKS_HEADER_SIZE]; + // allocate new packets for the remainder of new_ring + LLHost invalid_host; + for (S16 i = old_size; i < new_size; ++i) + { + new_ring[i] = new LLPacketBuffer(invalid_host, nullptr, 0); + } - proxywrap_t *socks_header = static_cast<proxywrap_t*>(static_cast<void*>(&headered_send_buffer)); - socks_header->rsv = 0; - socks_header->addr = host.getAddress(); - socks_header->port = htons(host.getPort()); - socks_header->atype = ADDRESS_IPV4; - socks_header->frag = 0; + // swap the rings and reset mHeadIndex + mPacketRing.swap(new_ring); + mHeadIndex = mNumBufferedPackets; + return true; +} - memcpy(headered_send_buffer + SOCKS_HEADER_SIZE, send_buffer, buf_size); +F32 LLPacketRing::getBufferLoadRate() const +{ + // goes up to MAX_BUFFER_RING_SIZE + return (F32)mNumBufferedPackets / (F32)DEFAULT_BUFFER_RING_SIZE; +} - return send_packet( h_socket, - headered_send_buffer, - buf_size + SOCKS_HEADER_SIZE, - LLProxy::getInstance()->getUDPProxy().getAddress(), - LLProxy::getInstance()->getUDPProxy().getPort()); +void LLPacketRing::dumpPacketRingStats() +{ + mNumDroppedPacketsTotal += mNumDroppedPackets; + LL_INFOS("Messaging") << "Packet ring stats: " << std::endl + << "Buffered packets: " << mNumBufferedPackets << std::endl + << "Buffered bytes: " << mNumBufferedBytes << std::endl + << "Dropped packets current: " << mNumDroppedPackets << std::endl + << "Dropped packets total: " << mNumDroppedPacketsTotal << std::endl + << "Dropped packets percentage: " << mDropPercentage << "%" << std::endl + << "Actual in bytes: " << mActualBytesIn << std::endl + << "Actual out bytes: " << mActualBytesOut << LL_ENDL; + mNumDroppedPackets = 0; } |