/** * @file llcircuit.cpp * @brief Class to track UDP endpoints for the message system. * * $LicenseInfo:firstyear=2002&license=viewergpl$ * * Copyright (c) 2002-2009, Linden Research, Inc. * * Second Life Viewer Source Code * The source code in this file ("Source Code") is provided by Linden Lab * to you under the terms of the GNU General Public License, version 2.0 * ("GPL"), unless you have obtained a separate licensing agreement * ("Other License"), formally executed by you and Linden Lab. Terms of * the GPL can be found in doc/GPL-license.txt in this distribution, or * online at http://secondlifegrid.net/programs/open_source/licensing/gplv2 * * There are special exceptions to the terms and conditions of the GPL as * it is applied to this Source Code. View the full text of the exception * in the file doc/FLOSS-exception.txt in this software distribution, or * online at * http://secondlifegrid.net/programs/open_source/licensing/flossexception * * By copying, modifying or distributing this software, you acknowledge * that you have read and understood your obligations described above, * and agree to abide by those obligations. * * ALL LINDEN LAB SOURCE CODE IS PROVIDED "AS IS." LINDEN LAB MAKES NO * WARRANTIES, EXPRESS, IMPLIED OR OTHERWISE, REGARDING ITS ACCURACY, * COMPLETENESS OR PERFORMANCE. * $/LicenseInfo$ */ #include "linden_common.h" #if LL_WINDOWS #include <process.h> #else #if LL_LINUX #include <dlfcn.h> // RTLD_LAZY #endif #include <sys/types.h> #include <sys/socket.h> #include <netinet/in.h> #endif #if !defined(USE_CIRCUIT_LIST) #include <algorithm> #endif #include <sstream> #include <iterator> #include <stack> #include "llcircuit.h" #include "message.h" #include "llrand.h" #include "llstl.h" #include "lltransfermanager.h" #include "llmodularmath.h" const S32 PING_START_BLOCK = 3; // How many pings behind we have to be to consider ourself blocked. const S32 PING_RELEASE_BLOCK = 2; // How many pings behind we have to be to consider ourself unblocked. const F32 TARGET_PERIOD_LENGTH = 5.f; // seconds const F32 LL_DUPLICATE_SUPPRESSION_TIMEOUT = 60.f; //seconds - this can be long, as time-based cleanup is // only done when wrapping packetids, now... LLCircuitData::LLCircuitData(const LLHost &host, TPACKETID in_id, const F32 circuit_heartbeat_interval, const F32 circuit_timeout) : mHost (host), mWrapID(0), mPacketsOutID(0), mPacketsInID(in_id), mHighestPacketID(in_id), mTimeoutCallback(NULL), mTimeoutUserData(NULL), mTrusted(FALSE), mbAllowTimeout(TRUE), mbAlive(TRUE), mBlocked(FALSE), mPingTime(0.0), mLastPingSendTime(0.0), mLastPingReceivedTime(0.0), mNextPingSendTime(0.0), mPingsInTransit(0), mLastPingID(0), mPingDelay(INITIAL_PING_VALUE_MSEC), mPingDelayAveraged((F32)INITIAL_PING_VALUE_MSEC), mUnackedPacketCount(0), mUnackedPacketBytes(0), mLocalEndPointID(), mPacketsOut(0), mPacketsIn(0), mPacketsLost(0), mBytesIn(0), mBytesOut(0), mLastPeriodLength(-1.f), mBytesInLastPeriod(0), mBytesOutLastPeriod(0), mBytesInThisPeriod(0), mBytesOutThisPeriod(0), mPeakBPSIn(0.f), mPeakBPSOut(0.f), mPeriodTime(0.0), mExistenceTimer(), mCurrentResendCount(0), mLastPacketGap(0), mHeartbeatInterval(circuit_heartbeat_interval), mHeartbeatTimeout(circuit_timeout) { // Need to guarantee that this time is up to date, we may be creating a circuit even though we haven't been // running a message system loop. F64 mt_sec = LLMessageSystem::getMessageTimeSeconds(TRUE); F32 distribution_offset = ll_frand(); mPingTime = mt_sec; mLastPingSendTime = mt_sec + mHeartbeatInterval * distribution_offset; mLastPingReceivedTime = mt_sec; mNextPingSendTime = mLastPingSendTime + 0.95*mHeartbeatInterval + ll_frand(0.1f*mHeartbeatInterval); mPeriodTime = mt_sec; mLocalEndPointID.generate(); } LLCircuitData::~LLCircuitData() { LLReliablePacket *packetp = NULL; // Clean up all pending transfers. gTransferManager.cleanupConnection(mHost); // remove all pending reliable messages on this circuit std::vector<TPACKETID> doomed; reliable_iter iter; reliable_iter end = mUnackedPackets.end(); for(iter = mUnackedPackets.begin(); iter != end; ++iter) { packetp = iter->second; gMessageSystem->mFailedResendPackets++; if(gMessageSystem->mVerboseLog) { doomed.push_back(packetp->mPacketID); } if (packetp->mCallback) { packetp->mCallback(packetp->mCallbackData,LL_ERR_CIRCUIT_GONE); } // Update stats mUnackedPacketCount--; mUnackedPacketBytes -= packetp->mBufferLength; delete packetp; } // remove all pending final retry reliable messages on this circuit end = mFinalRetryPackets.end(); for(iter = mFinalRetryPackets.begin(); iter != end; ++iter) { packetp = iter->second; gMessageSystem->mFailedResendPackets++; if(gMessageSystem->mVerboseLog) { doomed.push_back(packetp->mPacketID); } if (packetp->mCallback) { packetp->mCallback(packetp->mCallbackData,LL_ERR_CIRCUIT_GONE); } // Update stats mUnackedPacketCount--; mUnackedPacketBytes -= packetp->mBufferLength; delete packetp; } // log aborted reliable packets for this circuit. if(gMessageSystem->mVerboseLog && !doomed.empty()) { std::ostringstream str; std::ostream_iterator<TPACKETID> append(str, " "); str << "MSG: -> " << mHost << "\tABORTING RELIABLE:\t"; std::copy(doomed.begin(), doomed.end(), append); llinfos << str.str() << llendl; } } void LLCircuitData::ackReliablePacket(TPACKETID packet_num) { reliable_iter iter; LLReliablePacket *packetp; iter = mUnackedPackets.find(packet_num); if (iter != mUnackedPackets.end()) { packetp = iter->second; if(gMessageSystem->mVerboseLog) { std::ostringstream str; str << "MSG: <- " << packetp->mHost << "\tRELIABLE ACKED:\t" << packetp->mPacketID; llinfos << str.str() << llendl; } if (packetp->mCallback) { if (packetp->mTimeout < 0.f) // negative timeout will always return timeout even for successful ack, for debugging { packetp->mCallback(packetp->mCallbackData,LL_ERR_TCP_TIMEOUT); } else { packetp->mCallback(packetp->mCallbackData,LL_ERR_NOERR); } } // Update stats mUnackedPacketCount--; mUnackedPacketBytes -= packetp->mBufferLength; // Cleanup delete packetp; mUnackedPackets.erase(iter); return; } iter = mFinalRetryPackets.find(packet_num); if (iter != mFinalRetryPackets.end()) { packetp = iter->second; // llinfos << "Packet " << packet_num << " removed from the pending list" << llendl; if(gMessageSystem->mVerboseLog) { std::ostringstream str; str << "MSG: <- " << packetp->mHost << "\tRELIABLE ACKED:\t" << packetp->mPacketID; llinfos << str.str() << llendl; } if (packetp->mCallback) { if (packetp->mTimeout < 0.f) // negative timeout will always return timeout even for successful ack, for debugging { packetp->mCallback(packetp->mCallbackData,LL_ERR_TCP_TIMEOUT); } else { packetp->mCallback(packetp->mCallbackData,LL_ERR_NOERR); } } // Update stats mUnackedPacketCount--; mUnackedPacketBytes -= packetp->mBufferLength; // Cleanup delete packetp; mFinalRetryPackets.erase(iter); } else { // Couldn't find this packet on either of the unacked lists. // maybe it's a duplicate ack? } } S32 LLCircuitData::resendUnackedPackets(const F64 now) { S32 resent_packets = 0; LLReliablePacket *packetp; // // Theoretically we should search through the list for the packet with the oldest // packet ID, as otherwise when we WRAP we will resend reliable packets out of order. // Since resends are ALREADY out of order, and wrapping is highly rare (16+million packets), // I'm not going to worry about this for now - djs // reliable_iter iter; BOOL have_resend_overflow = FALSE; for (iter = mUnackedPackets.begin(); iter != mUnackedPackets.end();) { packetp = iter->second; // Only check overflow if we haven't had one yet. if (!have_resend_overflow) { have_resend_overflow = mThrottles.checkOverflow(TC_RESEND, 0); } if (have_resend_overflow) { // We've exceeded our bandwidth for resends. // Time to stop trying to send them. // If we have too many unacked packets, we need to start dropping expired ones. if (mUnackedPacketBytes > 512000) { if (now > packetp->mExpirationTime) { // This circuit has overflowed. Do not retry. Do not pass go. packetp->mRetries = 0; // Remove it from this list and add it to the final list. mUnackedPackets.erase(iter++); mFinalRetryPackets[packetp->mPacketID] = packetp; } else { ++iter; } // Move on to the next unacked packet. continue; } if (mUnackedPacketBytes > 256000 && !(getPacketsOut() % 1024)) { // Warn if we've got a lot of resends waiting. llwarns << mHost << " has " << mUnackedPacketBytes << " bytes of reliable messages waiting" << llendl; } // Stop resending. There are less than 512000 unacked packets. break; } if (now > packetp->mExpirationTime) { packetp->mRetries--; // retry mCurrentResendCount++; gMessageSystem->mResentPackets++; if(gMessageSystem->mVerboseLog) { std::ostringstream str; str << "MSG: -> " << packetp->mHost << "\tRESENDING RELIABLE:\t" << packetp->mPacketID; llinfos << str.str() << llendl; } packetp->mBuffer[0] |= LL_RESENT_FLAG; // tag packet id as being a resend gMessageSystem->mPacketRing.sendPacket(packetp->mSocket, (char *)packetp->mBuffer, packetp->mBufferLength, packetp->mHost); mThrottles.throttleOverflow(TC_RESEND, packetp->mBufferLength * 8.f); // The new method, retry time based on ping if (packetp->mPingBasedRetry) { packetp->mExpirationTime = now + llmax(LL_MINIMUM_RELIABLE_TIMEOUT_SECONDS, (LL_RELIABLE_TIMEOUT_FACTOR * getPingDelayAveraged())); } else { // custom, constant retry time packetp->mExpirationTime = now + packetp->mTimeout; } if (!packetp->mRetries) { // Last resend, remove it from this list and add it to the final list. mUnackedPackets.erase(iter++); mFinalRetryPackets[packetp->mPacketID] = packetp; } else { // Don't remove it yet, it still gets to try to resend at least once. ++iter; } resent_packets++; } else { // Don't need to do anything with this packet, keep iterating. ++iter; } } for (iter = mFinalRetryPackets.begin(); iter != mFinalRetryPackets.end();) { packetp = iter->second; if (now > packetp->mExpirationTime) { // fail (too many retries) //llinfos << "Packet " << packetp->mPacketID << " removed from the pending list: exceeded retry limit" << llendl; //if (packetp->mMessageName) //{ // llinfos << "Packet name " << packetp->mMessageName << llendl; //} gMessageSystem->mFailedResendPackets++; if(gMessageSystem->mVerboseLog) { std::ostringstream str; str << "MSG: -> " << packetp->mHost << "\tABORTING RELIABLE:\t" << packetp->mPacketID; llinfos << str.str() << llendl; } if (packetp->mCallback) { packetp->mCallback(packetp->mCallbackData,LL_ERR_TCP_TIMEOUT); } // Update stats mUnackedPacketCount--; mUnackedPacketBytes -= packetp->mBufferLength; mFinalRetryPackets.erase(iter++); delete packetp; } else { ++iter; } } return mUnackedPacketCount; } LLCircuit::LLCircuit(const F32 circuit_heartbeat_interval, const F32 circuit_timeout) : mLastCircuit(NULL), mHeartbeatInterval(circuit_heartbeat_interval), mHeartbeatTimeout(circuit_timeout) { } LLCircuit::~LLCircuit() { // delete pointers in the map. std::for_each(mCircuitData.begin(), mCircuitData.end(), llcompose1( DeletePointerFunctor<LLCircuitData>(), llselect2nd<circuit_data_map::value_type>())); } LLCircuitData *LLCircuit::addCircuitData(const LLHost &host, TPACKETID in_id) { // This should really validate if one already exists llinfos << "LLCircuit::addCircuitData for " << host << llendl; LLCircuitData *tempp = new LLCircuitData(host, in_id, mHeartbeatInterval, mHeartbeatTimeout); mCircuitData.insert(circuit_data_map::value_type(host, tempp)); mPingSet.insert(tempp); mLastCircuit = tempp; return tempp; } void LLCircuit::removeCircuitData(const LLHost &host) { llinfos << "LLCircuit::removeCircuitData for " << host << llendl; mLastCircuit = NULL; circuit_data_map::iterator it = mCircuitData.find(host); if(it != mCircuitData.end()) { LLCircuitData *cdp = it->second; mCircuitData.erase(it); LLCircuit::ping_set_t::iterator psit = mPingSet.find(cdp); if (psit != mPingSet.end()) { mPingSet.erase(psit); } else { llwarns << "Couldn't find entry for next ping in ping set!" << llendl; } // Clean up from optimization maps mUnackedCircuitMap.erase(host); mSendAckMap.erase(host); delete cdp; } // This also has to happen AFTER we nuke the circuit, because various // callbacks for the circuit may result in messages being sent to // this circuit, and the setting of mLastCircuit. We don't check // if the host matches, but we don't really care because mLastCircuit // is an optimization, and this happens VERY rarely. mLastCircuit = NULL; } void LLCircuitData::setAlive(BOOL b_alive) { if (mbAlive != b_alive) { mPacketsOutID = 0; mPacketsInID = 0; mbAlive = b_alive; } if (b_alive) { mLastPingReceivedTime = LLMessageSystem::getMessageTimeSeconds(); mPingsInTransit = 0; mBlocked = FALSE; } } void LLCircuitData::setAllowTimeout(BOOL allow) { mbAllowTimeout = allow; if (allow) { // resuming circuit // make sure it's alive setAlive(TRUE); } } // Reset per-period counters if necessary. void LLCircuitData::checkPeriodTime() { F64 mt_sec = LLMessageSystem::getMessageTimeSeconds(); F64 period_length = mt_sec - mPeriodTime; if ( period_length > TARGET_PERIOD_LENGTH) { F32 bps_in = (F32)(mBytesInThisPeriod * 8.f / period_length); if (bps_in > mPeakBPSIn) { mPeakBPSIn = bps_in; } F32 bps_out = (F32)(mBytesOutThisPeriod * 8.f / period_length); if (bps_out > mPeakBPSOut) { mPeakBPSOut = bps_out; } mBytesInLastPeriod = mBytesInThisPeriod; mBytesOutLastPeriod = mBytesOutThisPeriod; mBytesInThisPeriod = 0; mBytesOutThisPeriod = 0; mLastPeriodLength = (F32)period_length; mPeriodTime = mt_sec; } } void LLCircuitData::addBytesIn(S32 bytes) { mBytesIn += bytes; mBytesInThisPeriod += bytes; } void LLCircuitData::addBytesOut(S32 bytes) { mBytesOut += bytes; mBytesOutThisPeriod += bytes; } void LLCircuitData::addReliablePacket(S32 mSocket, U8 *buf_ptr, S32 buf_len, LLReliablePacketParams *params) { LLReliablePacket *packet_info; packet_info = new LLReliablePacket(mSocket, buf_ptr, buf_len, params); mUnackedPacketCount++; mUnackedPacketBytes += packet_info->mBufferLength; if (params && params->mRetries) { mUnackedPackets[packet_info->mPacketID] = packet_info; } else { mFinalRetryPackets[packet_info->mPacketID] = packet_info; } } void LLCircuit::resendUnackedPackets(S32& unacked_list_length, S32& unacked_list_size) { F64 now = LLMessageSystem::getMessageTimeSeconds(); unacked_list_length = 0; unacked_list_size = 0; LLCircuitData* circ; circuit_data_map::iterator end = mUnackedCircuitMap.end(); for(circuit_data_map::iterator it = mUnackedCircuitMap.begin(); it != end; ++it) { circ = (*it).second; unacked_list_length += circ->resendUnackedPackets(now); unacked_list_size += circ->getUnackedPacketBytes(); } } BOOL LLCircuitData::isDuplicateResend(TPACKETID packetnum) { return (mRecentlyReceivedReliablePackets.find(packetnum) != mRecentlyReceivedReliablePackets.end()); } void LLCircuit::dumpResends() { circuit_data_map::iterator end = mCircuitData.end(); for(circuit_data_map::iterator it = mCircuitData.begin(); it != end; ++it) { (*it).second->dumpResendCountAndReset(); } } LLCircuitData* LLCircuit::findCircuit(const LLHost& host) const { // An optimization on finding the previously found circuit. if (mLastCircuit && (mLastCircuit->mHost == host)) { return mLastCircuit; } circuit_data_map::const_iterator it = mCircuitData.find(host); if(it == mCircuitData.end()) { return NULL; } mLastCircuit = it->second; return mLastCircuit; } BOOL LLCircuit::isCircuitAlive(const LLHost& host) const { LLCircuitData *cdp = findCircuit(host); if(cdp) { return cdp->mbAlive; } return FALSE; } void LLCircuitData::setTimeoutCallback(void (*callback_func)(const LLHost &host, void *user_data), void *user_data) { mTimeoutCallback = callback_func; mTimeoutUserData = user_data; } void LLCircuitData::checkPacketInID(TPACKETID id, BOOL receive_resent) { // Done as floats so we don't have to worry about running out of room // with U32 getting poked into an S32. F32 delta = (F32)mHighestPacketID - (F32)id; if (delta > (0.5f*LL_MAX_OUT_PACKET_ID)) { // We've almost definitely wrapped, reset the mLastPacketID to be low again. mHighestPacketID = id; } else if (delta < (-0.5f*LL_MAX_OUT_PACKET_ID)) { // This is almost definitely an old packet coming in after a wrap, ignore it. } else { mHighestPacketID = llmax(mHighestPacketID, id); } // Have we received anything on this circuit yet? if (0 == mPacketsIn) { // Must be first packet from unclosed circuit. mPacketsIn++; setPacketInID((id + 1) % LL_MAX_OUT_PACKET_ID); mLastPacketGap = 0; mOutOfOrderRate.count(0); return; } mPacketsIn++; // now, check to see if we've got a gap U32 gap = 0; if ((mPacketsInID == id)) { // nope! bump and wrap the counter, then return mPacketsInID++; mPacketsInID = (mPacketsInID) % LL_MAX_OUT_PACKET_ID; } else if (id < mWrapID) { // id < mWrapID will happen if the first few packets are out of order. . . // at that point we haven't marked anything "potentially lost" and // the out-of-order packet will cause a full wrap marking all the IDs "potentially lost" // do nothing } else { // we have a gap! if that id is in the map, remove it from the map, leave mCurrentCircuit->mPacketsInID // alone // otherwise, walk from mCurrentCircuit->mPacketsInID to id with wrapping, adding the values to the map // and setting mPacketsInID to id + 1 % LL_MAX_OUT_PACKET_ID // babbage: all operands in expression are unsigned, so modular // arithmetic will always find correct gap, regardless of wrap arounds. const U8 width = 24; gap = LLModularMath::subtract<width>(mPacketsInID, id); if (mPotentialLostPackets.find(id) != mPotentialLostPackets.end()) { if(gMessageSystem->mVerboseLog) { std::ostringstream str; str << "MSG: <- " << mHost << "\tRECOVERING LOST:\t" << id; llinfos << str.str() << llendl; } // llinfos << "removing potential lost: " << id << llendl; mPotentialLostPackets.erase(id); } else if (!receive_resent) // don't freak out over out-of-order reliable resends { U64 time = LLMessageSystem::getMessageTimeUsecs(); TPACKETID index = mPacketsInID; S32 gap_count = 0; if ((index < id) && ((id - index) < 16)) { while (index != id) { if(gMessageSystem->mVerboseLog) { std::ostringstream str; str << "MSG: <- " << mHost << "\tPACKET GAP:\t" << index; llinfos << str.str() << llendl; } // llinfos << "adding potential lost: " << index << llendl; mPotentialLostPackets[index] = time; index++; index = index % LL_MAX_OUT_PACKET_ID; gap_count++; } } else { llinfos << "packet_out_of_order - got packet " << id << " expecting " << index << " from " << mHost << llendl; if(gMessageSystem->mVerboseLog) { std::ostringstream str; str << "MSG: <- " << mHost << "\tPACKET GAP:\t" << id << " expected " << index; llinfos << str.str() << llendl; } } mPacketsInID = id + 1; mPacketsInID = (mPacketsInID) % LL_MAX_OUT_PACKET_ID; if (gap_count > 128) { llwarns << "Packet loss gap filler running amok!" << llendl; } else if (gap_count > 16) { llwarns << "Sustaining large amounts of packet loss!" << llendl; } } } mOutOfOrderRate.count(gap); mLastPacketGap = gap; } void LLCircuit::updateWatchDogTimers(LLMessageSystem *msgsys) { F64 cur_time = LLMessageSystem::getMessageTimeSeconds(); S32 count = mPingSet.size(); S32 cur = 0; // Only process each circuit once at most, stop processing if no circuits while((cur < count) && !mPingSet.empty()) { cur++; LLCircuit::ping_set_t::iterator psit = mPingSet.begin(); LLCircuitData *cdp = *psit; if (!cdp->mbAlive) { // We suspect that this case should never happen, given how // the alive status is set. // Skip over dead circuits, just add the ping interval and push it to the back // Always remember to remove it from the set before changing the sorting // key (mNextPingSendTime) mPingSet.erase(psit); cdp->mNextPingSendTime = cur_time + mHeartbeatInterval; mPingSet.insert(cdp); continue; } else { // Check to see if this needs a ping if (cur_time < cdp->mNextPingSendTime) { // This circuit doesn't need a ping, break out because // we have a sorted list, thus no more circuits need pings break; } // Update watchdog timers if (cdp->updateWatchDogTimers(msgsys)) { // Randomize our pings a bit by doing some up to 5% early or late F64 dt = 0.95f*mHeartbeatInterval + ll_frand(0.1f*mHeartbeatInterval); // Remove it, and reinsert it with the new next ping time. // Always remove before changing the sorting key. mPingSet.erase(psit); cdp->mNextPingSendTime = cur_time + dt; mPingSet.insert(cdp); // Update our throttles cdp->mThrottles.dynamicAdjust(); // Update some stats, this is not terribly important cdp->checkPeriodTime(); } else { // This mPingSet.erase isn't necessary, because removing the circuit will // remove the ping set. //mPingSet.erase(psit); removeCircuitData(cdp->mHost); } } } } BOOL LLCircuitData::updateWatchDogTimers(LLMessageSystem *msgsys) { F64 cur_time = LLMessageSystem::getMessageTimeSeconds(); mLastPingSendTime = cur_time; if (!checkCircuitTimeout()) { // Pass this back to the calling LLCircuit, this circuit needs to be cleaned up. return FALSE; } // WARNING! // Duplicate suppression can FAIL if packets are delivered out of // order, although it's EXTREMELY unlikely. It would require // that the ping get delivered out of order enough that the ACK // for the packet that it was out of order with was received BEFORE // the ping was sent. // Find the current oldest reliable packetID // This is to handle the case if we actually manage to wrap our // packet IDs - the oldest will actually have a higher packet ID // than the current. BOOL wrapped = FALSE; reliable_iter iter; iter = mUnackedPackets.upper_bound(getPacketOutID()); if (iter == mUnackedPackets.end()) { // Nothing AFTER this one, so we want the lowest packet ID // then. iter = mUnackedPackets.begin(); wrapped = TRUE; } TPACKETID packet_id = 0; // Check against the "final" packets BOOL wrapped_final = FALSE; reliable_iter iter_final; iter_final = mFinalRetryPackets.upper_bound(getPacketOutID()); if (iter_final == mFinalRetryPackets.end()) { iter_final = mFinalRetryPackets.begin(); wrapped_final = TRUE; } //llinfos << mHost << " - unacked count " << mUnackedPackets.size() << llendl; //llinfos << mHost << " - final count " << mFinalRetryPackets.size() << llendl; if (wrapped != wrapped_final) { // One of the "unacked" or "final" lists hasn't wrapped. Whichever one // hasn't has the oldest packet. if (!wrapped) { // Hasn't wrapped, so the one on the // unacked packet list is older packet_id = iter->first; //llinfos << mHost << ": nowrapped unacked" << llendl; } else { packet_id = iter_final->first; //llinfos << mHost << ": nowrapped final" << llendl; } } else { // They both wrapped, we can just use the minimum of the two. if ((iter == mUnackedPackets.end()) && (iter_final == mFinalRetryPackets.end())) { // Wow! No unacked packets at all! // Send the ID of the last packet we sent out. // This will flush all of the destination's // unacked packets, theoretically. //llinfos << mHost << ": No unacked!" << llendl; packet_id = getPacketOutID(); } else { BOOL had_unacked = FALSE; if (iter != mUnackedPackets.end()) { // Unacked list has the lowest so far packet_id = iter->first; had_unacked = TRUE; //llinfos << mHost << ": Unacked" << llendl; } if (iter_final != mFinalRetryPackets.end()) { // Use the lowest of the unacked list and the final list if (had_unacked) { // Both had a packet, use the lowest. packet_id = llmin(packet_id, iter_final->first); //llinfos << mHost << ": Min of unacked/final" << llendl; } else { // Only the final had a packet, use it. packet_id = iter_final->first; //llinfos << mHost << ": Final!" << llendl; } } } } // Send off the another ping. pingTimerStart(); msgsys->newMessageFast(_PREHASH_StartPingCheck); msgsys->nextBlock(_PREHASH_PingID); msgsys->addU8Fast(_PREHASH_PingID, nextPingID()); msgsys->addU32Fast(_PREHASH_OldestUnacked, packet_id); msgsys->sendMessage(mHost); // Also do lost packet accounting. // Check to see if anything on our lost list is old enough to // be considered lost LLCircuitData::packet_time_map::iterator it; U64 timeout = (U64)(1000000.0*llmin(LL_MAX_LOST_TIMEOUT, getPingDelayAveraged() * LL_LOST_TIMEOUT_FACTOR)); U64 mt_usec = LLMessageSystem::getMessageTimeUsecs(); for (it = mPotentialLostPackets.begin(); it != mPotentialLostPackets.end(); ) { U64 delta_t_usec = mt_usec - (*it).second; if (delta_t_usec > timeout) { // let's call this one a loss! mPacketsLost++; gMessageSystem->mDroppedPackets++; if(gMessageSystem->mVerboseLog) { std::ostringstream str; str << "MSG: <- " << mHost << "\tLOST PACKET:\t" << (*it).first; llinfos << str.str() << llendl; } mPotentialLostPackets.erase(it++); } else { ++it; } } return TRUE; } void LLCircuitData::clearDuplicateList(TPACKETID oldest_id) { // purge old data from the duplicate suppression queue // we want to KEEP all x where oldest_id <= x <= last incoming packet, and delete everything else. //llinfos << mHost << ": clearing before oldest " << oldest_id << llendl; //llinfos << "Recent list before: " << mRecentlyReceivedReliablePackets.size() << llendl; if (oldest_id < mHighestPacketID) { // Clean up everything with a packet ID less than oldest_id. packet_time_map::iterator pit_start; packet_time_map::iterator pit_end; pit_start = mRecentlyReceivedReliablePackets.begin(); pit_end = mRecentlyReceivedReliablePackets.lower_bound(oldest_id); mRecentlyReceivedReliablePackets.erase(pit_start, pit_end); } // Do timeout checks on everything with an ID > mHighestPacketID. // This should be empty except for wrapping IDs. Thus, this should be // highly rare. U64 mt_usec = LLMessageSystem::getMessageTimeUsecs(); packet_time_map::iterator pit; for(pit = mRecentlyReceivedReliablePackets.upper_bound(mHighestPacketID); pit != mRecentlyReceivedReliablePackets.end(); ) { // Validate that the packet ID seems far enough away if ((pit->first - mHighestPacketID) < 100) { llwarns << "Probably incorrectly timing out non-wrapped packets!" << llendl; } U64 delta_t_usec = mt_usec - (*pit).second; F64 delta_t_sec = delta_t_usec * SEC_PER_USEC; if (delta_t_sec > LL_DUPLICATE_SUPPRESSION_TIMEOUT) { // enough time has elapsed we're not likely to get a duplicate on this one llinfos << "Clearing " << pit->first << " from recent list" << llendl; mRecentlyReceivedReliablePackets.erase(pit++); } else { ++pit; } } //llinfos << "Recent list after: " << mRecentlyReceivedReliablePackets.size() << llendl; } BOOL LLCircuitData::checkCircuitTimeout() { F64 time_since_last_ping = LLMessageSystem::getMessageTimeSeconds() - mLastPingReceivedTime; // Nota Bene: This needs to be turned off if you are debugging multiple simulators if (time_since_last_ping > mHeartbeatTimeout) { llwarns << "LLCircuitData::checkCircuitTimeout for " << mHost << " last ping " << time_since_last_ping << " seconds ago." <<llendl; setAlive(FALSE); if (mTimeoutCallback) { llwarns << "LLCircuitData::checkCircuitTimeout for " << mHost << " calling callback." << llendl; mTimeoutCallback(mHost, mTimeoutUserData); } if (!isAlive()) { // The callback didn't try and resurrect the circuit. We should kill it. llwarns << "LLCircuitData::checkCircuitTimeout for " << mHost << " still dead, dropping." << llendl; return FALSE; } } return TRUE; } // Call this method when a reliable message comes in - this will // correctly place the packet in the correct list to be acked later. BOOL LLCircuitData::collectRAck(TPACKETID packet_num) { if (mAcks.empty()) { // First extra ack, we need to add ourselves to the list of circuits that need to send acks gMessageSystem->mCircuitInfo.mSendAckMap[mHost] = this; } mAcks.push_back(packet_num); return TRUE; } // this method is called during the message system processAcks() to // send out any acks that did not get sent already. void LLCircuit::sendAcks() { LLCircuitData* cd; circuit_data_map::iterator end = mSendAckMap.end(); for(circuit_data_map::iterator it = mSendAckMap.begin(); it != end; ++it) { cd = (*it).second; S32 count = (S32)cd->mAcks.size(); if(count > 0) { // send the packet acks S32 acks_this_packet = 0; for(S32 i = 0; i < count; ++i) { if(acks_this_packet == 0) { gMessageSystem->newMessageFast(_PREHASH_PacketAck); } gMessageSystem->nextBlockFast(_PREHASH_Packets); gMessageSystem->addU32Fast(_PREHASH_ID, cd->mAcks[i]); ++acks_this_packet; if(acks_this_packet > 250) { gMessageSystem->sendMessage(cd->mHost); acks_this_packet = 0; } } if(acks_this_packet > 0) { gMessageSystem->sendMessage(cd->mHost); } if(gMessageSystem->mVerboseLog) { std::ostringstream str; str << "MSG: -> " << cd->mHost << "\tPACKET ACKS:\t"; std::ostream_iterator<TPACKETID> append(str, " "); std::copy(cd->mAcks.begin(), cd->mAcks.end(), append); llinfos << str.str() << llendl; } // empty out the acks list cd->mAcks.clear(); } } // All acks have been sent, clear the map mSendAckMap.clear(); } std::ostream& operator<<(std::ostream& s, LLCircuitData& circuit) { F32 age = circuit.mExistenceTimer.getElapsedTimeF32(); using namespace std; s << "Circuit " << circuit.mHost << " "; s << circuit.mRemoteID << " "; s << (circuit.mbAlive ? "Alive" : "Not Alive") << " "; s << (circuit.mbAllowTimeout ? "Timeout Allowed" : "Timeout Not Allowed"); s << endl; s << " Packets Lost: " << circuit.mPacketsLost; s << " Measured Ping: " << circuit.mPingDelay; s << " Averaged Ping: " << circuit.mPingDelayAveraged; s << endl; s << "Global In/Out " << S32(age) << " sec"; s << " KBytes: " << circuit.mBytesIn / 1024 << "/" << circuit.mBytesOut / 1024; s << " Kbps: "; s << S32(circuit.mBytesIn * 8.f / circuit.mExistenceTimer.getElapsedTimeF32() / 1024.f); s << "/"; s << S32(circuit.mBytesOut * 8.f / circuit.mExistenceTimer.getElapsedTimeF32() / 1024.f); s << " Packets: " << circuit.mPacketsIn << "/" << circuit.mPacketsOut; s << endl; s << "Recent In/Out " << S32(circuit.mLastPeriodLength) << " sec"; s << " KBytes: "; s << circuit.mBytesInLastPeriod / 1024; s << "/"; s << circuit.mBytesOutLastPeriod / 1024; s << " Kbps: "; s << S32(circuit.mBytesInLastPeriod * 8.f / circuit.mLastPeriodLength / 1024.f); s << "/"; s << S32(circuit.mBytesOutLastPeriod * 8.f / circuit.mLastPeriodLength / 1024.f); s << " Peak kbps: "; s << S32(circuit.mPeakBPSIn / 1024.f); s << "/"; s << S32(circuit.mPeakBPSOut / 1024.f); s << endl; return s; } void LLCircuitData::getInfo(LLSD& info) const { info["Host"] = mHost.getIPandPort(); info["Alive"] = mbAlive; info["Age"] = mExistenceTimer.getElapsedTimeF32(); } void LLCircuitData::dumpResendCountAndReset() { if (mCurrentResendCount) { llinfos << "Circuit: " << mHost << " resent " << mCurrentResendCount << " packets" << llendl; mCurrentResendCount = 0; } } std::ostream& operator<<(std::ostream& s, LLCircuit &circuit) { s << "Circuit Info:" << std::endl; LLCircuit::circuit_data_map::iterator end = circuit.mCircuitData.end(); LLCircuit::circuit_data_map::iterator it; for(it = circuit.mCircuitData.begin(); it != end; ++it) { s << *((*it).second) << std::endl; } return s; } void LLCircuit::getInfo(LLSD& info) const { LLCircuit::circuit_data_map::const_iterator end = mCircuitData.end(); LLCircuit::circuit_data_map::const_iterator it; LLSD circuit_info; for(it = mCircuitData.begin(); it != end; ++it) { (*it).second->getInfo(circuit_info); info["Circuits"].append(circuit_info); } } void LLCircuit::getCircuitRange( const LLHost& key, LLCircuit::circuit_data_map::iterator& first, LLCircuit::circuit_data_map::iterator& end) { end = mCircuitData.end(); first = mCircuitData.upper_bound(key); } TPACKETID LLCircuitData::nextPacketOutID() { mPacketsOut++; TPACKETID id; id = (mPacketsOutID + 1) % LL_MAX_OUT_PACKET_ID; if (id < mPacketsOutID) { // we just wrapped on a circuit, reset the wrap ID to zero mWrapID = 0; } mPacketsOutID = id; return id; } void LLCircuitData::setPacketInID(TPACKETID id) { id = id % LL_MAX_OUT_PACKET_ID; mPacketsInID = id; mRecentlyReceivedReliablePackets.clear(); mWrapID = id; } void LLCircuitData::pingTimerStop(const U8 ping_id) { F64 mt_secs = LLMessageSystem::getMessageTimeSeconds(); // Nota Bene: no averaging of ping times until we get a feel for how this works F64 time = mt_secs - mPingTime; if (time == 0.0) { // Ack, we got our ping response on the same frame! Sigh, let's get a real time otherwise // all of our ping calculations will be skewed. mt_secs = LLMessageSystem::getMessageTimeSeconds(TRUE); } mLastPingReceivedTime = mt_secs; // If ping is longer than 1 second, we'll get sequence deltas in the ping. // Approximate by assuming each ping counts for 1 second (slightly low, probably) S32 delta_ping = (S32)mLastPingID - (S32) ping_id; if (delta_ping < 0) { delta_ping += 256; } U32 msec = (U32) ((delta_ping*mHeartbeatInterval + time) * 1000.f); setPingDelay(msec); mPingsInTransit = delta_ping; if (mBlocked && (mPingsInTransit <= PING_RELEASE_BLOCK)) { mBlocked = FALSE; } } void LLCircuitData::pingTimerStart() { mPingTime = LLMessageSystem::getMessageTimeSeconds(); mPingsInTransit++; if (!mBlocked && (mPingsInTransit > PING_START_BLOCK)) { mBlocked = TRUE; } } U32 LLCircuitData::getPacketsIn() const { return mPacketsIn; } S32 LLCircuitData::getBytesIn() const { return mBytesIn; } S32 LLCircuitData::getBytesOut() const { return mBytesOut; } U32 LLCircuitData::getPacketsOut() const { return mPacketsOut; } TPACKETID LLCircuitData::getPacketOutID() const { return mPacketsOutID; } U32 LLCircuitData::getPacketsLost() const { return mPacketsLost; } BOOL LLCircuitData::isAlive() const { return mbAlive; } BOOL LLCircuitData::isBlocked() const { return mBlocked; } BOOL LLCircuitData::getAllowTimeout() const { return mbAllowTimeout; } U32 LLCircuitData::getPingDelay() const { return mPingDelay; } F32 LLCircuitData::getPingInTransitTime() { // This may be inaccurate in the case of a circuit that was "dead" and then revived, // but only until the first round trip ping is sent - djs F32 time_since_ping_was_sent = 0; if (mPingsInTransit) { time_since_ping_was_sent = (F32)((mPingsInTransit*mHeartbeatInterval - 1) + (LLMessageSystem::getMessageTimeSeconds() - mPingTime))*1000.f; } return time_since_ping_was_sent; } void LLCircuitData::setPingDelay(U32 ping) { mPingDelay = ping; mPingDelayAveraged = llmax((F32)ping, getPingDelayAveraged()); mPingDelayAveraged = ((1.f - LL_AVERAGED_PING_ALPHA) * mPingDelayAveraged) + (LL_AVERAGED_PING_ALPHA * (F32) ping); mPingDelayAveraged = llclamp(mPingDelayAveraged, LL_AVERAGED_PING_MIN, LL_AVERAGED_PING_MAX); } F32 LLCircuitData::getPingDelayAveraged() { return llmin(llmax(getPingInTransitTime(), mPingDelayAveraged), LL_AVERAGED_PING_MAX); } BOOL LLCircuitData::getTrusted() const { return mTrusted; } void LLCircuitData::setTrusted(BOOL t) { mTrusted = t; } F32 LLCircuitData::getAgeInSeconds() const { return mExistenceTimer.getElapsedTimeF32(); }