summaryrefslogtreecommitdiff
path: root/indra/llmessage/llpacketring.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'indra/llmessage/llpacketring.cpp')
-rw-r--r--indra/llmessage/llpacketring.cpp462
1 files changed, 228 insertions, 234 deletions
diff --git a/indra/llmessage/llpacketring.cpp b/indra/llmessage/llpacketring.cpp
index be838770a8..eb6650c6c5 100644
--- a/indra/llmessage/llpacketring.cpp
+++ b/indra/llmessage/llpacketring.cpp
@@ -1,6 +1,6 @@
/**
* @file llpacketring.cpp
- * @brief implementation of LLPacketRing class for a packet.
+ * @brief implementation of LLPacketRing class.
*
* $LicenseInfo:firstyear=2001&license=viewerlgpl$
* Second Life Viewer Source Code
@@ -43,329 +43,323 @@
#include "message.h"
#include "u64.h"
-///////////////////////////////////////////////////////////
-LLPacketRing::LLPacketRing () :
- mUseInThrottle(false),
- mUseOutThrottle(false),
- mInThrottle(256000.f),
- mOutThrottle(64000.f),
- mActualBitsIn(0),
- mActualBitsOut(0),
- mMaxBufferLength(64000),
- mInBufferLength(0),
- mOutBufferLength(0),
- mDropPercentage(0.0f),
- mPacketsToDrop(0x0)
+constexpr S16 MAX_BUFFER_RING_SIZE = 1024;
+constexpr S16 DEFAULT_BUFFER_RING_SIZE = 256;
+
+LLPacketRing::LLPacketRing ()
+ : mPacketRing(DEFAULT_BUFFER_RING_SIZE, nullptr)
{
+ LLHost invalid_host;
+ for (size_t i = 0; i < mPacketRing.size(); ++i)
+ {
+ mPacketRing[i] = new LLPacketBuffer(invalid_host, nullptr, 0);
+ }
}
-///////////////////////////////////////////////////////////
LLPacketRing::~LLPacketRing ()
{
- cleanup();
+ for (auto packet : mPacketRing)
+ {
+ delete packet;
+ }
+ mPacketRing.clear();
+ mNumBufferedPackets = 0;
+ mNumBufferedBytes = 0;
+ mHeadIndex = 0;
}
-///////////////////////////////////////////////////////////
-void LLPacketRing::cleanup ()
+S32 LLPacketRing::receivePacket (S32 socket, char *datap)
{
- LLPacketBuffer *packetp;
+ bool drop = computeDrop();
+ return (mNumBufferedPackets > 0) ?
+ receiveOrDropBufferedPacket(datap, drop) :
+ receiveOrDropPacket(socket, datap, drop);
+}
- while (!mReceiveQueue.empty())
+bool send_packet_helper(int socket, const char * datap, S32 data_size, LLHost host)
+{
+ if (!LLProxy::isSOCKSProxyEnabled())
{
- packetp = mReceiveQueue.front();
- delete packetp;
- mReceiveQueue.pop();
+ return send_packet(socket, datap, data_size, host.getAddress(), host.getPort());
}
- while (!mSendQueue.empty())
- {
- packetp = mSendQueue.front();
- delete packetp;
- mSendQueue.pop();
- }
-}
+ char headered_send_buffer[NET_BUFFER_SIZE + SOCKS_HEADER_SIZE];
-///////////////////////////////////////////////////////////
-void LLPacketRing::dropPackets (U32 num_to_drop)
-{
- mPacketsToDrop += num_to_drop;
-}
+ proxywrap_t *socks_header = static_cast<proxywrap_t*>(static_cast<void*>(&headered_send_buffer));
+ socks_header->rsv = 0;
+ socks_header->addr = host.getAddress();
+ socks_header->port = htons(host.getPort());
+ socks_header->atype = ADDRESS_IPV4;
+ socks_header->frag = 0;
-///////////////////////////////////////////////////////////
-void LLPacketRing::setDropPercentage (F32 percent_to_drop)
-{
- mDropPercentage = percent_to_drop;
-}
+ memcpy(headered_send_buffer + SOCKS_HEADER_SIZE, datap, data_size);
-void LLPacketRing::setUseInThrottle(const bool use_throttle)
-{
- mUseInThrottle = use_throttle;
+ return send_packet( socket,
+ headered_send_buffer,
+ data_size + SOCKS_HEADER_SIZE,
+ LLProxy::getInstance()->getUDPProxy().getAddress(),
+ LLProxy::getInstance()->getUDPProxy().getPort());
}
-void LLPacketRing::setUseOutThrottle(const bool use_throttle)
+bool LLPacketRing::sendPacket(int socket, const char * datap, S32 data_size, LLHost host)
{
- mUseOutThrottle = use_throttle;
+ mActualBytesOut += data_size;
+ return send_packet_helper(socket, datap, data_size, host);
}
-void LLPacketRing::setInBandwidth(const F32 bps)
+void LLPacketRing::dropPackets (U32 num_to_drop)
{
- mInThrottle.setRate(bps);
+ mPacketsToDrop += num_to_drop;
}
-void LLPacketRing::setOutBandwidth(const F32 bps)
+void LLPacketRing::setDropPercentage (F32 percent_to_drop)
{
- mOutThrottle.setRate(bps);
+ mDropPercentage = percent_to_drop;
}
-///////////////////////////////////////////////////////////
-S32 LLPacketRing::receiveFromRing (S32 socket, char *datap)
-{
- if (mInThrottle.checkOverflow(0))
+bool LLPacketRing::computeDrop()
+{
+ bool drop= (mDropPercentage > 0.0f && (ll_frand(100.f) < mDropPercentage));
+ if (drop)
{
- // We don't have enough bandwidth, don't give them a packet.
- return 0;
+ ++mPacketsToDrop;
}
-
- LLPacketBuffer *packetp = NULL;
- if (mReceiveQueue.empty())
+ if (mPacketsToDrop > 0)
{
- // No packets on the queue, don't give them any.
- return 0;
+ --mPacketsToDrop;
+ drop = true;
}
-
- S32 packet_size = 0;
- packetp = mReceiveQueue.front();
- mReceiveQueue.pop();
- packet_size = packetp->getSize();
- if (packetp->getData() != NULL)
- {
- memcpy(datap, packetp->getData(), packet_size); /*Flawfinder: ignore*/
- }
- // need to set sender IP/port!!
- mLastSender = packetp->getHost();
- mLastReceivingIF = packetp->getReceivingInterface();
- delete packetp;
-
- this->mInBufferLength -= packet_size;
-
- // Adjust the throttle
- mInThrottle.throttleOverflow(packet_size * 8.f);
- return packet_size;
+ return drop;
}
-///////////////////////////////////////////////////////////
-S32 LLPacketRing::receivePacket (S32 socket, char *datap)
+S32 LLPacketRing::receiveOrDropPacket(S32 socket, char *datap, bool drop)
{
S32 packet_size = 0;
- // If using the throttle, simulate a limited size input buffer.
- if (mUseInThrottle)
+ // pull straight from socket
+ if (LLProxy::isSOCKSProxyEnabled())
{
- bool done = false;
-
- // push any current net packet (if any) onto delay ring
- while (!done)
+ char buffer[NET_BUFFER_SIZE + SOCKS_HEADER_SIZE]; /* Flawfinder ignore */
+ packet_size = receive_packet(socket, buffer);
+ if (packet_size > 0)
{
- LLPacketBuffer *packetp;
- packetp = new LLPacketBuffer(socket);
-
- if (packetp->getSize())
- {
- mActualBitsIn += packetp->getSize() * 8;
-
- // Fake packet loss
- if (mDropPercentage && (ll_frand(100.f) < mDropPercentage))
- {
- mPacketsToDrop++;
- }
-
- if (mPacketsToDrop)
- {
- delete packetp;
- packetp = NULL;
- packet_size = 0;
- mPacketsToDrop--;
- }
- }
+ mActualBytesIn += packet_size;
+ }
- // If we faked packet loss, then we don't have a packet
- // to use for buffer overflow testing
- if (packetp)
+ if (packet_size > SOCKS_HEADER_SIZE)
+ {
+ if (drop)
{
- if (mInBufferLength + packetp->getSize() > mMaxBufferLength)
- {
- // Toss it.
- LL_WARNS() << "Throwing away packet, overflowing buffer" << LL_ENDL;
- delete packetp;
- packetp = NULL;
- }
- else if (packetp->getSize())
- {
- mReceiveQueue.push(packetp);
- mInBufferLength += packetp->getSize();
- }
- else
- {
- delete packetp;
- packetp = NULL;
- done = true;
- }
+ packet_size = 0;
}
else
{
- // No packetp, keep going? - no packetp == faked packet loss
- }
- }
-
- // Now, grab data off of the receive queue according to our
- // throttled bandwidth settings.
- packet_size = receiveFromRing(socket, datap);
- }
- else
- {
- // no delay, pull straight from net
- if (LLProxy::isSOCKSProxyEnabled())
- {
- U8 buffer[NET_BUFFER_SIZE + SOCKS_HEADER_SIZE];
- packet_size = receive_packet(socket, static_cast<char*>(static_cast<void*>(buffer)));
-
- if (packet_size > SOCKS_HEADER_SIZE)
- {
// *FIX We are assuming ATYP is 0x01 (IPv4), not 0x03 (hostname) or 0x04 (IPv6)
- memcpy(datap, buffer + SOCKS_HEADER_SIZE, packet_size - SOCKS_HEADER_SIZE);
+ packet_size -= SOCKS_HEADER_SIZE; // The unwrapped packet size
+ memcpy(datap, buffer + SOCKS_HEADER_SIZE, packet_size);
proxywrap_t * header = static_cast<proxywrap_t*>(static_cast<void*>(buffer));
mLastSender.setAddress(header->addr);
mLastSender.setPort(ntohs(header->port));
-
- packet_size -= SOCKS_HEADER_SIZE; // The unwrapped packet size
- }
- else
- {
- packet_size = 0;
+ mLastReceivingIF = ::get_receiving_interface();
}
}
else
{
- packet_size = receive_packet(socket, datap);
- mLastSender = ::get_sender();
+ packet_size = 0;
}
-
- mLastReceivingIF = ::get_receiving_interface();
-
- if (packet_size) // did we actually get a packet?
+ }
+ else
+ {
+ packet_size = receive_packet(socket, datap);
+ if (packet_size > 0)
{
- if (mDropPercentage && (ll_frand(100.f) < mDropPercentage))
+ mActualBytesIn += packet_size;
+ if (drop)
{
- mPacketsToDrop++;
+ packet_size = 0;
}
-
- if (mPacketsToDrop)
+ else
{
- packet_size = 0;
- mPacketsToDrop--;
+ mLastSender = ::get_sender();
+ mLastReceivingIF = ::get_receiving_interface();
}
}
}
-
return packet_size;
}
-bool LLPacketRing::sendPacket(int h_socket, char * send_buffer, S32 buf_size, LLHost host)
+S32 LLPacketRing::receiveOrDropBufferedPacket(char *datap, bool drop)
{
- bool status = true;
- if (!mUseOutThrottle)
+ assert(mNumBufferedPackets > 0);
+ S32 packet_size = 0;
+
+ S16 ring_size = (S16)(mPacketRing.size());
+ S16 packet_index = (mHeadIndex + ring_size - mNumBufferedPackets) % ring_size;
+ LLPacketBuffer* packet = mPacketRing[packet_index];
+ packet_size = packet->getSize();
+ mLastSender = packet->getHost();
+ mLastReceivingIF = packet->getReceivingInterface();
+
+ --mNumBufferedPackets;
+ mNumBufferedBytes -= packet_size;
+ if (mNumBufferedPackets == 0)
{
- return sendPacketImpl(h_socket, send_buffer, buf_size, host );
+ assert(mNumBufferedBytes == 0);
+ }
+
+ if (!drop)
+ {
+ assert(packet_size > 0);
+ memcpy(datap, packet->getData(), packet_size);
}
else
{
- mActualBitsOut += buf_size * 8;
- LLPacketBuffer *packetp = NULL;
- // See if we've got enough throttle to send a packet.
- while (!mOutThrottle.checkOverflow(0.f))
- {
- // While we have enough bandwidth, send a packet from the queue or the current packet
+ packet_size = 0;
+ }
+ return packet_size;
+}
+
+S32 LLPacketRing::bufferInboundPacket(S32 socket)
+{
+ if (mNumBufferedPackets == mPacketRing.size() && mNumBufferedPackets < MAX_BUFFER_RING_SIZE)
+ {
+ expandRing();
+ }
- S32 packet_size = 0;
- if (!mSendQueue.empty())
+ LLPacketBuffer* packet = mPacketRing[mHeadIndex];
+ S32 old_packet_size = packet->getSize();
+ S32 packet_size = 0;
+ if (LLProxy::isSOCKSProxyEnabled())
+ {
+ char buffer[NET_BUFFER_SIZE + SOCKS_HEADER_SIZE]; /* Flawfinder ignore */
+ packet_size = receive_packet(socket, buffer);
+ if (packet_size > 0)
+ {
+ mActualBytesIn += packet_size;
+ if (packet_size > SOCKS_HEADER_SIZE)
{
- // Send a packet off of the queue
- LLPacketBuffer *packetp = mSendQueue.front();
- mSendQueue.pop();
+ // *FIX We are assuming ATYP is 0x01 (IPv4), not 0x03 (hostname) or 0x04 (IPv6)
- mOutBufferLength -= packetp->getSize();
- packet_size = packetp->getSize();
+ proxywrap_t * header = static_cast<proxywrap_t*>(static_cast<void*>(buffer));
+ LLHost sender;
+ sender.setAddress(header->addr);
+ sender.setPort(ntohs(header->port));
- status = sendPacketImpl(h_socket, packetp->getData(), packet_size, packetp->getHost());
+ packet_size -= SOCKS_HEADER_SIZE; // The unwrapped packet size
+ packet->init(buffer + SOCKS_HEADER_SIZE, packet_size, sender);
- delete packetp;
- // Update the throttle
- mOutThrottle.throttleOverflow(packet_size * 8.f);
+ mHeadIndex = (mHeadIndex + 1) % (S16)(mPacketRing.size());
+ if (mNumBufferedPackets < MAX_BUFFER_RING_SIZE)
+ {
+ ++mNumBufferedPackets;
+ mNumBufferedBytes += packet_size;
+ }
+ else
+ {
+ // we overwrote an older packet
+ mNumBufferedBytes += packet_size - old_packet_size;
+ }
}
else
{
- // If the queue's empty, we can just send this packet right away.
- status = sendPacketImpl(h_socket, send_buffer, buf_size, host );
- packet_size = buf_size;
-
- // Update the throttle
- mOutThrottle.throttleOverflow(packet_size * 8.f);
-
- // This was the packet we're sending now, there are no other packets
- // that we need to send
- return status;
+ packet_size = 0;
}
-
- }
-
- // We haven't sent the incoming packet, add it to the queue
- if (mOutBufferLength + buf_size > mMaxBufferLength)
- {
- // Nuke this packet, we overflowed the buffer.
- // Toss it.
- LL_WARNS() << "Throwing away outbound packet, overflowing buffer" << LL_ENDL;
}
- else
+ }
+ else
+ {
+ packet->init(socket);
+ packet_size = packet->getSize();
+ if (packet_size > 0)
{
- static LLTimer queue_timer;
- if ((mOutBufferLength > 4192) && queue_timer.getElapsedTimeF32() > 1.f)
+ mActualBytesIn += packet_size;
+
+ mHeadIndex = (mHeadIndex + 1) % (S16)(mPacketRing.size());
+ if (mNumBufferedPackets < MAX_BUFFER_RING_SIZE)
{
- // Add it to the queue
- LL_INFOS() << "Outbound packet queue " << mOutBufferLength << " bytes" << LL_ENDL;
- queue_timer.reset();
+ ++mNumBufferedPackets;
+ mNumBufferedBytes += packet_size;
+ }
+ else
+ {
+ // we overwrote an older packet
+ mNumBufferedBytes += packet_size - old_packet_size;
}
- packetp = new LLPacketBuffer(host, send_buffer, buf_size);
-
- mOutBufferLength += packetp->getSize();
- mSendQueue.push(packetp);
}
}
+ return packet_size;
+}
- return status;
+S32 LLPacketRing::drainSocket(S32 socket)
+{
+ // drain into buffer
+ S32 packet_size = 1;
+ S32 num_loops = 0;
+ S32 old_num_packets = mNumBufferedPackets;
+ while (packet_size > 0)
+ {
+ packet_size = bufferInboundPacket(socket);
+ ++num_loops;
+ }
+ S32 num_dropped_packets = (num_loops - 1 + old_num_packets) - mNumBufferedPackets;
+ if (num_dropped_packets > 0)
+ {
+ // It will eventually be accounted by mDroppedPackets
+ // and mPacketsLost, but track it here for logging purposes.
+ mNumDroppedPackets += num_dropped_packets;
+ }
+ return (S32)(mNumBufferedPackets);
}
-bool LLPacketRing::sendPacketImpl(int h_socket, const char * send_buffer, S32 buf_size, LLHost host)
+bool LLPacketRing::expandRing()
{
+ // compute larger size
+ constexpr S16 BUFFER_RING_EXPANSION = 256;
+ S16 old_size = (S16)(mPacketRing.size());
+ S16 new_size = llmin(old_size + BUFFER_RING_EXPANSION, MAX_BUFFER_RING_SIZE);
+ if (new_size == old_size)
+ {
+ // mPacketRing is already maxed out
+ return false;
+ }
- if (!LLProxy::isSOCKSProxyEnabled())
+ // make a larger ring and copy packet pointers
+ std::vector<LLPacketBuffer*> new_ring(new_size, nullptr);
+ for (S16 i = 0; i < old_size; ++i)
{
- return send_packet(h_socket, send_buffer, buf_size, host.getAddress(), host.getPort());
+ S16 j = (mHeadIndex + i) % old_size;
+ new_ring[i] = mPacketRing[j];
}
- char headered_send_buffer[NET_BUFFER_SIZE + SOCKS_HEADER_SIZE];
+ // allocate new packets for the remainder of new_ring
+ LLHost invalid_host;
+ for (S16 i = old_size; i < new_size; ++i)
+ {
+ new_ring[i] = new LLPacketBuffer(invalid_host, nullptr, 0);
+ }
- proxywrap_t *socks_header = static_cast<proxywrap_t*>(static_cast<void*>(&headered_send_buffer));
- socks_header->rsv = 0;
- socks_header->addr = host.getAddress();
- socks_header->port = htons(host.getPort());
- socks_header->atype = ADDRESS_IPV4;
- socks_header->frag = 0;
+ // swap the rings and reset mHeadIndex
+ mPacketRing.swap(new_ring);
+ mHeadIndex = mNumBufferedPackets;
+ return true;
+}
- memcpy(headered_send_buffer + SOCKS_HEADER_SIZE, send_buffer, buf_size);
+F32 LLPacketRing::getBufferLoadRate() const
+{
+ // goes up to MAX_BUFFER_RING_SIZE
+ return (F32)mNumBufferedPackets / (F32)DEFAULT_BUFFER_RING_SIZE;
+}
- return send_packet( h_socket,
- headered_send_buffer,
- buf_size + SOCKS_HEADER_SIZE,
- LLProxy::getInstance()->getUDPProxy().getAddress(),
- LLProxy::getInstance()->getUDPProxy().getPort());
+void LLPacketRing::dumpPacketRingStats()
+{
+ mNumDroppedPacketsTotal += mNumDroppedPackets;
+ LL_INFOS("Messaging") << "Packet ring stats: " << std::endl
+ << "Buffered packets: " << mNumBufferedPackets << std::endl
+ << "Buffered bytes: " << mNumBufferedBytes << std::endl
+ << "Dropped packets current: " << mNumDroppedPackets << std::endl
+ << "Dropped packets total: " << mNumDroppedPacketsTotal << std::endl
+ << "Dropped packets percentage: " << mDropPercentage << "%" << std::endl
+ << "Actual in bytes: " << mActualBytesIn << std::endl
+ << "Actual out bytes: " << mActualBytesOut << LL_ENDL;
+ mNumDroppedPackets = 0;
}