summaryrefslogtreecommitdiff
path: root/indra/llmessage/lliosocket.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'indra/llmessage/lliosocket.cpp')
-rw-r--r--indra/llmessage/lliosocket.cpp1034
1 files changed, 517 insertions, 517 deletions
diff --git a/indra/llmessage/lliosocket.cpp b/indra/llmessage/lliosocket.cpp
index 321d7286eb..a14d10fe5f 100644
--- a/indra/llmessage/lliosocket.cpp
+++ b/indra/llmessage/lliosocket.cpp
@@ -1,4 +1,4 @@
-/**
+/**
* @file lliosocket.cpp
* @author Phoenix
* @date 2005-07-31
@@ -7,21 +7,21 @@
* $LicenseInfo:firstyear=2005&license=viewerlgpl$
* Second Life Viewer Source Code
* Copyright (C) 2010, Linden Research, Inc.
- *
+ *
* This library is free software; you can redistribute it and/or
* modify it under the terms of the GNU Lesser General Public
* License as published by the Free Software Foundation;
* version 2.1 of the License only.
- *
+ *
* This library is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Lesser General Public License for more details.
- *
+ *
* You should have received a copy of the GNU Lesser General Public
* License along with this library; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
- *
+ *
* Linden Research, Inc., 945 Battery Street, San Francisco, CA 94111 USA
* $/LicenseInfo$
*/
@@ -50,15 +50,15 @@ static const S32 LL_RECV_BUFFER_SIZE = 40000;
//static const U16 LL_PORT_DISCOVERY_RANGE_MAX = 13050;
//
-// local methods
+// local methods
//
bool is_addr_in_use(apr_status_t status)
{
#if LL_WINDOWS
- return (WSAEADDRINUSE == APR_TO_OS_ERROR(status));
+ return (WSAEADDRINUSE == APR_TO_OS_ERROR(status));
#else
- return (EADDRINUSE == APR_TO_OS_ERROR(status));
+ return (EADDRINUSE == APR_TO_OS_ERROR(status));
#endif
}
@@ -71,28 +71,28 @@ bool is_addr_in_use(apr_status_t status)
#endif
-// Quick function
+// Quick function
void ll_debug_socket(const char* msg, apr_socket_t* apr_sock)
{
#if LL_DEBUG_SOCKET_FILE_DESCRIPTORS
- if(!apr_sock)
- {
- LL_DEBUGS("Socket") << "Socket -- " << (msg?msg:"") << ": no socket." << LL_ENDL;
- return;
- }
- // *TODO: Why doesn't this work?
- //apr_os_sock_t os_sock;
- int os_sock;
- if(APR_SUCCESS == apr_os_sock_get(&os_sock, apr_sock))
- {
- LL_DEBUGS("Socket") << "Socket -- " << (msg?msg:"") << " on fd " << os_sock
- << " at " << apr_sock << LL_ENDL;
- }
- else
- {
- LL_DEBUGS("Socket") << "Socket -- " << (msg?msg:"") << " no fd "
- << " at " << apr_sock << LL_ENDL;
- }
+ if(!apr_sock)
+ {
+ LL_DEBUGS("Socket") << "Socket -- " << (msg?msg:"") << ": no socket." << LL_ENDL;
+ return;
+ }
+ // *TODO: Why doesn't this work?
+ //apr_os_sock_t os_sock;
+ int os_sock;
+ if(APR_SUCCESS == apr_os_sock_get(&os_sock, apr_sock))
+ {
+ LL_DEBUGS("Socket") << "Socket -- " << (msg?msg:"") << " on fd " << os_sock
+ << " at " << apr_sock << LL_ENDL;
+ }
+ else
+ {
+ LL_DEBUGS("Socket") << "Socket -- " << (msg?msg:"") << " no fd "
+ << " at " << apr_sock << LL_ENDL;
+ }
#endif
}
@@ -103,162 +103,162 @@ void ll_debug_socket(const char* msg, apr_socket_t* apr_sock)
// static
LLSocket::ptr_t LLSocket::create(apr_pool_t* pool, EType type, U16 port, const char *hostname)
{
- LLSocket::ptr_t rv;
- apr_socket_t* socket = NULL;
- apr_pool_t* new_pool = NULL;
- apr_status_t status = APR_EGENERAL;
-
- // create a pool for the socket
- status = apr_pool_create(&new_pool, pool);
- if(ll_apr_warn_status(status))
- {
- if(new_pool) apr_pool_destroy(new_pool);
- return rv;
- }
-
- if(STREAM_TCP == type)
- {
- status = apr_socket_create(
- &socket,
- APR_INET,
- SOCK_STREAM,
- APR_PROTO_TCP,
- new_pool);
- }
- else if(DATAGRAM_UDP == type)
- {
- status = apr_socket_create(
- &socket,
- APR_INET,
- SOCK_DGRAM,
- APR_PROTO_UDP,
- new_pool);
- }
- else
- {
- if(new_pool) apr_pool_destroy(new_pool);
- return rv;
- }
- if(ll_apr_warn_status(status))
- {
- if(new_pool) apr_pool_destroy(new_pool);
- return rv;
- }
- // At this point, the new LLSocket instance takes ownership of new_pool,
- // which is why no early return below this call explicitly destroys it: it
- // is instead cleaned up by ~LLSocket().
- rv = ptr_t(new LLSocket(socket, new_pool));
- if(port > 0)
- {
- apr_sockaddr_t* sa = NULL;
- status = apr_sockaddr_info_get(
- &sa,
- hostname,
- APR_UNSPEC,
- port,
- 0,
- new_pool);
- if(ll_apr_warn_status(status))
- {
- rv.reset();
- return rv;
- }
- // This allows us to reuse the address on quick down/up. This
- // is unlikely to create problems.
- ll_apr_warn_status(apr_socket_opt_set(socket, APR_SO_REUSEADDR, 1));
- status = apr_socket_bind(socket, sa);
- if(ll_apr_warn_status(status))
- {
- rv.reset();
- return rv;
- }
- LL_DEBUGS() << "Bound " << ((DATAGRAM_UDP == type) ? "udp" : "tcp")
- << " socket to port: " << sa->port << LL_ENDL;
- if(STREAM_TCP == type)
- {
- // If it's a stream based socket, we need to tell the OS
- // to keep a queue of incoming connections for ACCEPT.
- LL_DEBUGS() << "Setting listen state for socket." << LL_ENDL;
- status = apr_socket_listen(
- socket,
- LL_DEFAULT_LISTEN_BACKLOG);
- if(ll_apr_warn_status(status))
- {
- rv.reset();
- return rv;
- }
- }
- }
- else // port <= 0
- {
- // we need to indicate that we have an ephemeral port if the
- // previous calls were successful. It will
- port = PORT_EPHEMERAL;
- }
- rv->mPort = port;
- rv->setNonBlocking();
- return rv;
+ LLSocket::ptr_t rv;
+ apr_socket_t* socket = NULL;
+ apr_pool_t* new_pool = NULL;
+ apr_status_t status = APR_EGENERAL;
+
+ // create a pool for the socket
+ status = apr_pool_create(&new_pool, pool);
+ if(ll_apr_warn_status(status))
+ {
+ if(new_pool) apr_pool_destroy(new_pool);
+ return rv;
+ }
+
+ if(STREAM_TCP == type)
+ {
+ status = apr_socket_create(
+ &socket,
+ APR_INET,
+ SOCK_STREAM,
+ APR_PROTO_TCP,
+ new_pool);
+ }
+ else if(DATAGRAM_UDP == type)
+ {
+ status = apr_socket_create(
+ &socket,
+ APR_INET,
+ SOCK_DGRAM,
+ APR_PROTO_UDP,
+ new_pool);
+ }
+ else
+ {
+ if(new_pool) apr_pool_destroy(new_pool);
+ return rv;
+ }
+ if(ll_apr_warn_status(status))
+ {
+ if(new_pool) apr_pool_destroy(new_pool);
+ return rv;
+ }
+ // At this point, the new LLSocket instance takes ownership of new_pool,
+ // which is why no early return below this call explicitly destroys it: it
+ // is instead cleaned up by ~LLSocket().
+ rv = ptr_t(new LLSocket(socket, new_pool));
+ if(port > 0)
+ {
+ apr_sockaddr_t* sa = NULL;
+ status = apr_sockaddr_info_get(
+ &sa,
+ hostname,
+ APR_UNSPEC,
+ port,
+ 0,
+ new_pool);
+ if(ll_apr_warn_status(status))
+ {
+ rv.reset();
+ return rv;
+ }
+ // This allows us to reuse the address on quick down/up. This
+ // is unlikely to create problems.
+ ll_apr_warn_status(apr_socket_opt_set(socket, APR_SO_REUSEADDR, 1));
+ status = apr_socket_bind(socket, sa);
+ if(ll_apr_warn_status(status))
+ {
+ rv.reset();
+ return rv;
+ }
+ LL_DEBUGS() << "Bound " << ((DATAGRAM_UDP == type) ? "udp" : "tcp")
+ << " socket to port: " << sa->port << LL_ENDL;
+ if(STREAM_TCP == type)
+ {
+ // If it's a stream based socket, we need to tell the OS
+ // to keep a queue of incoming connections for ACCEPT.
+ LL_DEBUGS() << "Setting listen state for socket." << LL_ENDL;
+ status = apr_socket_listen(
+ socket,
+ LL_DEFAULT_LISTEN_BACKLOG);
+ if(ll_apr_warn_status(status))
+ {
+ rv.reset();
+ return rv;
+ }
+ }
+ }
+ else // port <= 0
+ {
+ // we need to indicate that we have an ephemeral port if the
+ // previous calls were successful. It will
+ port = PORT_EPHEMERAL;
+ }
+ rv->mPort = port;
+ rv->setNonBlocking();
+ return rv;
}
// static
LLSocket::ptr_t LLSocket::create(apr_socket_t* socket, apr_pool_t* pool)
{
- LLSocket::ptr_t rv;
- if(!socket)
- {
- return rv;
- }
- rv = ptr_t(new LLSocket(socket, pool));
- rv->mPort = PORT_EPHEMERAL;
- rv->setNonBlocking();
- return rv;
+ LLSocket::ptr_t rv;
+ if(!socket)
+ {
+ return rv;
+ }
+ rv = ptr_t(new LLSocket(socket, pool));
+ rv->mPort = PORT_EPHEMERAL;
+ rv->setNonBlocking();
+ return rv;
}
bool LLSocket::blockingConnect(const LLHost& host)
{
- if(!mSocket) return false;
- apr_sockaddr_t* sa = NULL;
- std::string ip_address;
- ip_address = host.getIPString();
- if(ll_apr_warn_status(apr_sockaddr_info_get(
- &sa,
- ip_address.c_str(),
- APR_UNSPEC,
- host.getPort(),
- 0,
- mPool)))
- {
- return false;
- }
- setBlocking(1000);
- ll_debug_socket("Blocking connect", mSocket);
- if(ll_apr_warn_status(apr_socket_connect(mSocket, sa))) return false;
- setNonBlocking();
- return true;
+ if(!mSocket) return false;
+ apr_sockaddr_t* sa = NULL;
+ std::string ip_address;
+ ip_address = host.getIPString();
+ if(ll_apr_warn_status(apr_sockaddr_info_get(
+ &sa,
+ ip_address.c_str(),
+ APR_UNSPEC,
+ host.getPort(),
+ 0,
+ mPool)))
+ {
+ return false;
+ }
+ setBlocking(1000);
+ ll_debug_socket("Blocking connect", mSocket);
+ if(ll_apr_warn_status(apr_socket_connect(mSocket, sa))) return false;
+ setNonBlocking();
+ return true;
}
LLSocket::LLSocket(apr_socket_t* socket, apr_pool_t* pool) :
- mSocket(socket),
- mPool(pool),
- mPort(PORT_INVALID)
+ mSocket(socket),
+ mPool(pool),
+ mPort(PORT_INVALID)
{
- ll_debug_socket("Constructing wholely formed socket", mSocket);
+ ll_debug_socket("Constructing wholely formed socket", mSocket);
}
LLSocket::~LLSocket()
{
- // *FIX: clean up memory we are holding.
- if(mSocket)
- {
- ll_debug_socket("Destroying socket", mSocket);
- apr_socket_close(mSocket);
- mSocket = NULL;
- }
- if(mPool)
- {
- apr_pool_destroy(mPool);
- }
+ // *FIX: clean up memory we are holding.
+ if(mSocket)
+ {
+ ll_debug_socket("Destroying socket", mSocket);
+ apr_socket_close(mSocket);
+ mSocket = NULL;
+ }
+ if(mPool)
+ {
+ apr_pool_destroy(mPool);
+ }
}
// See http://dev.ariel-networks.com/apr/apr-tutorial/html/apr-tutorial-13.html#ss13.4
@@ -267,21 +267,21 @@ LLSocket::~LLSocket()
void LLSocket::setBlocking(S32 timeout)
{
- // set up the socket options
- ll_apr_warn_status(apr_socket_timeout_set(mSocket, timeout)); // Sets both receive and send timeout SO_RCVTIMEO, SO_SNDTIMEO
- ll_apr_warn_status(apr_socket_opt_set(mSocket, APR_SO_NONBLOCK, 0));
- ll_apr_warn_status(apr_socket_opt_set(mSocket, APR_SO_SNDBUF, LL_SEND_BUFFER_SIZE));
- ll_apr_warn_status(apr_socket_opt_set(mSocket, APR_SO_RCVBUF, LL_RECV_BUFFER_SIZE));
+ // set up the socket options
+ ll_apr_warn_status(apr_socket_timeout_set(mSocket, timeout)); // Sets both receive and send timeout SO_RCVTIMEO, SO_SNDTIMEO
+ ll_apr_warn_status(apr_socket_opt_set(mSocket, APR_SO_NONBLOCK, 0));
+ ll_apr_warn_status(apr_socket_opt_set(mSocket, APR_SO_SNDBUF, LL_SEND_BUFFER_SIZE));
+ ll_apr_warn_status(apr_socket_opt_set(mSocket, APR_SO_RCVBUF, LL_RECV_BUFFER_SIZE));
}
void LLSocket::setNonBlocking()
{
- // set up the socket options
- ll_apr_warn_status(apr_socket_timeout_set(mSocket, 0));
- ll_apr_warn_status(apr_socket_opt_set(mSocket, APR_SO_NONBLOCK, 1));
- ll_apr_warn_status(apr_socket_opt_set(mSocket, APR_SO_SNDBUF, LL_SEND_BUFFER_SIZE));
- ll_apr_warn_status(apr_socket_opt_set(mSocket, APR_SO_RCVBUF, LL_RECV_BUFFER_SIZE));
+ // set up the socket options
+ ll_apr_warn_status(apr_socket_timeout_set(mSocket, 0));
+ ll_apr_warn_status(apr_socket_opt_set(mSocket, APR_SO_NONBLOCK, 1));
+ ll_apr_warn_status(apr_socket_opt_set(mSocket, APR_SO_SNDBUF, LL_SEND_BUFFER_SIZE));
+ ll_apr_warn_status(apr_socket_opt_set(mSocket, APR_SO_RCVBUF, LL_RECV_BUFFER_SIZE));
}
@@ -290,96 +290,96 @@ void LLSocket::setNonBlocking()
///
LLIOSocketReader::LLIOSocketReader(LLSocket::ptr_t socket) :
- mSource(socket),
- mInitialized(false)
+ mSource(socket),
+ mInitialized(false)
{
}
LLIOSocketReader::~LLIOSocketReader()
{
- //LL_DEBUGS() << "Destroying LLIOSocketReader" << LL_ENDL;
+ //LL_DEBUGS() << "Destroying LLIOSocketReader" << LL_ENDL;
}
// virtual
LLIOPipe::EStatus LLIOSocketReader::process_impl(
- const LLChannelDescriptors& channels,
- buffer_ptr_t& buffer,
- bool& eos,
- LLSD& context,
- LLPumpIO* pump)
+ const LLChannelDescriptors& channels,
+ buffer_ptr_t& buffer,
+ bool& eos,
+ LLSD& context,
+ LLPumpIO* pump)
{
LL_PROFILE_ZONE_SCOPED;
- PUMP_DEBUG;
- if(!mSource) return STATUS_PRECONDITION_NOT_MET;
- if(!mInitialized)
- {
- PUMP_DEBUG;
- // Since the read will not block, it's ok to initialize and
- // attempt to read off the descriptor immediately.
- mInitialized = true;
- if(pump)
- {
- PUMP_DEBUG;
- LL_DEBUGS() << "Initializing poll descriptor for LLIOSocketReader."
- << LL_ENDL;
- apr_pollfd_t poll_fd;
- poll_fd.p = NULL;
- poll_fd.desc_type = APR_POLL_SOCKET;
- poll_fd.reqevents = APR_POLLIN;
- poll_fd.rtnevents = 0x0;
- poll_fd.desc.s = mSource->getSocket();
- poll_fd.client_data = NULL;
- pump->setConditional(this, &poll_fd);
- }
- }
- //if(!buffer)
- //{
- // buffer = new LLBufferArray;
- //}
- PUMP_DEBUG;
- const apr_size_t READ_BUFFER_SIZE = 1024;
- char read_buf[READ_BUFFER_SIZE]; /*Flawfinder: ignore*/
- apr_size_t len;
- apr_status_t status = APR_SUCCESS;
- do
- {
- PUMP_DEBUG;
- len = READ_BUFFER_SIZE;
- status = apr_socket_recv(mSource->getSocket(), read_buf, &len);
- buffer->append(channels.out(), (U8*)read_buf, len);
- } while((APR_SUCCESS == status) && (READ_BUFFER_SIZE == len));
- LL_DEBUGS() << "socket read status: " << status << LL_ENDL;
- LLIOPipe::EStatus rv = STATUS_OK;
-
- PUMP_DEBUG;
- // *FIX: Also need to check for broken pipe
- if(APR_STATUS_IS_EOF(status))
- {
- // *FIX: Should we shut down the socket read?
- if(pump)
- {
- pump->setConditional(this, NULL);
- }
- rv = STATUS_DONE;
- eos = true;
- }
- else if(APR_STATUS_IS_EAGAIN(status))
- {
+ PUMP_DEBUG;
+ if(!mSource) return STATUS_PRECONDITION_NOT_MET;
+ if(!mInitialized)
+ {
+ PUMP_DEBUG;
+ // Since the read will not block, it's ok to initialize and
+ // attempt to read off the descriptor immediately.
+ mInitialized = true;
+ if(pump)
+ {
+ PUMP_DEBUG;
+ LL_DEBUGS() << "Initializing poll descriptor for LLIOSocketReader."
+ << LL_ENDL;
+ apr_pollfd_t poll_fd;
+ poll_fd.p = NULL;
+ poll_fd.desc_type = APR_POLL_SOCKET;
+ poll_fd.reqevents = APR_POLLIN;
+ poll_fd.rtnevents = 0x0;
+ poll_fd.desc.s = mSource->getSocket();
+ poll_fd.client_data = NULL;
+ pump->setConditional(this, &poll_fd);
+ }
+ }
+ //if(!buffer)
+ //{
+ // buffer = new LLBufferArray;
+ //}
+ PUMP_DEBUG;
+ const apr_size_t READ_BUFFER_SIZE = 1024;
+ char read_buf[READ_BUFFER_SIZE]; /*Flawfinder: ignore*/
+ apr_size_t len;
+ apr_status_t status = APR_SUCCESS;
+ do
+ {
+ PUMP_DEBUG;
+ len = READ_BUFFER_SIZE;
+ status = apr_socket_recv(mSource->getSocket(), read_buf, &len);
+ buffer->append(channels.out(), (U8*)read_buf, len);
+ } while((APR_SUCCESS == status) && (READ_BUFFER_SIZE == len));
+ LL_DEBUGS() << "socket read status: " << status << LL_ENDL;
+ LLIOPipe::EStatus rv = STATUS_OK;
+
+ PUMP_DEBUG;
+ // *FIX: Also need to check for broken pipe
+ if(APR_STATUS_IS_EOF(status))
+ {
+ // *FIX: Should we shut down the socket read?
+ if(pump)
+ {
+ pump->setConditional(this, NULL);
+ }
+ rv = STATUS_DONE;
+ eos = true;
+ }
+ else if(APR_STATUS_IS_EAGAIN(status))
+ {
/*Commented out by Aura 9-9-8 for DEV-19961.
- // everything is fine, but we can terminate this process pump.
-
- rv = STATUS_BREAK;
+ // everything is fine, but we can terminate this process pump.
+
+ rv = STATUS_BREAK;
*/
- }
- else
- {
- if(ll_apr_warn_status(status))
- {
- rv = STATUS_ERROR;
- }
- }
- PUMP_DEBUG;
- return rv;
+ }
+ else
+ {
+ if(ll_apr_warn_status(status))
+ {
+ rv = STATUS_ERROR;
+ }
+ }
+ PUMP_DEBUG;
+ return rv;
}
///
@@ -387,143 +387,143 @@ LLIOPipe::EStatus LLIOSocketReader::process_impl(
///
LLIOSocketWriter::LLIOSocketWriter(LLSocket::ptr_t socket) :
- mDestination(socket),
- mLastWritten(NULL),
- mInitialized(false)
+ mDestination(socket),
+ mLastWritten(NULL),
+ mInitialized(false)
{
}
LLIOSocketWriter::~LLIOSocketWriter()
{
- //LL_DEBUGS() << "Destroying LLIOSocketWriter" << LL_ENDL;
+ //LL_DEBUGS() << "Destroying LLIOSocketWriter" << LL_ENDL;
}
// virtual
LLIOPipe::EStatus LLIOSocketWriter::process_impl(
- const LLChannelDescriptors& channels,
- buffer_ptr_t& buffer,
- bool& eos,
- LLSD& context,
- LLPumpIO* pump)
+ const LLChannelDescriptors& channels,
+ buffer_ptr_t& buffer,
+ bool& eos,
+ LLSD& context,
+ LLPumpIO* pump)
{
LL_PROFILE_ZONE_SCOPED;
- PUMP_DEBUG;
- if(!mDestination) return STATUS_PRECONDITION_NOT_MET;
- if(!mInitialized)
- {
- PUMP_DEBUG;
- // Since the write will not block, it's ok to initialize and
- // attempt to write immediately.
- mInitialized = true;
- if(pump)
- {
- PUMP_DEBUG;
- LL_DEBUGS() << "Initializing poll descriptor for LLIOSocketWriter."
- << LL_ENDL;
- apr_pollfd_t poll_fd;
- poll_fd.p = NULL;
- poll_fd.desc_type = APR_POLL_SOCKET;
- poll_fd.reqevents = APR_POLLOUT;
- poll_fd.rtnevents = 0x0;
- poll_fd.desc.s = mDestination->getSocket();
- poll_fd.client_data = NULL;
- pump->setConditional(this, &poll_fd);
- }
- }
-
- PUMP_DEBUG;
- // *FIX: Some sort of writev implementation would be much more
- // efficient - not only because writev() is better, but also
- // because we won't have to do as much work to find the start
- // address.
- buffer->lock();
- LLBufferArray::segment_iterator_t it;
- LLBufferArray::segment_iterator_t end = buffer->endSegment();
- LLSegment segment;
- it = buffer->constructSegmentAfter(mLastWritten, segment);
- /*
- if(NULL == mLastWritten)
- {
- it = buffer->beginSegment();
- segment = (*it);
- }
- else
- {
- it = buffer->getSegment(mLastWritten);
- segment = (*it);
- S32 size = segment.size();
- U8* data = segment.data();
- if((data + size) == mLastWritten)
- {
- ++it;
- segment = (*it);
- }
- else
- {
- // *FIX: check the math on this one
- segment = LLSegment(
- (*it).getChannelMask(),
- mLastWritten + 1,
- size - (mLastWritten - data));
- }
- }
- */
-
- PUMP_DEBUG;
- apr_size_t len;
- bool done = false;
- apr_status_t status = APR_SUCCESS;
- while(it != end)
- {
-
- PUMP_DEBUG;
- if((*it).isOnChannel(channels.in()))
- {
- PUMP_DEBUG;
- len = (apr_size_t)segment.size();
- status = apr_socket_send(
- mDestination->getSocket(),
- (const char*)segment.data(),
- &len);
- // We sometimes get a 'non-blocking socket operation could not be
- // completed immediately' error from apr_socket_send. In this
- // case we break and the data will be sent the next time the chain
- // is pumped.
- if(APR_STATUS_IS_EAGAIN(status))
- {
- ll_apr_warn_status(status);
- break;
- }
-
- mLastWritten = segment.data() + len - 1;
-
- PUMP_DEBUG;
- if((S32)len < segment.size())
- {
- break;
- }
-
- }
-
- ++it;
- if(it != end)
- {
- segment = (*it);
- }
- else
- {
- done = true;
- }
-
- }
- buffer->unlock();
-
- PUMP_DEBUG;
- if(done && eos)
- {
- return STATUS_DONE;
- }
- return STATUS_OK;
+ PUMP_DEBUG;
+ if(!mDestination) return STATUS_PRECONDITION_NOT_MET;
+ if(!mInitialized)
+ {
+ PUMP_DEBUG;
+ // Since the write will not block, it's ok to initialize and
+ // attempt to write immediately.
+ mInitialized = true;
+ if(pump)
+ {
+ PUMP_DEBUG;
+ LL_DEBUGS() << "Initializing poll descriptor for LLIOSocketWriter."
+ << LL_ENDL;
+ apr_pollfd_t poll_fd;
+ poll_fd.p = NULL;
+ poll_fd.desc_type = APR_POLL_SOCKET;
+ poll_fd.reqevents = APR_POLLOUT;
+ poll_fd.rtnevents = 0x0;
+ poll_fd.desc.s = mDestination->getSocket();
+ poll_fd.client_data = NULL;
+ pump->setConditional(this, &poll_fd);
+ }
+ }
+
+ PUMP_DEBUG;
+ // *FIX: Some sort of writev implementation would be much more
+ // efficient - not only because writev() is better, but also
+ // because we won't have to do as much work to find the start
+ // address.
+ buffer->lock();
+ LLBufferArray::segment_iterator_t it;
+ LLBufferArray::segment_iterator_t end = buffer->endSegment();
+ LLSegment segment;
+ it = buffer->constructSegmentAfter(mLastWritten, segment);
+ /*
+ if(NULL == mLastWritten)
+ {
+ it = buffer->beginSegment();
+ segment = (*it);
+ }
+ else
+ {
+ it = buffer->getSegment(mLastWritten);
+ segment = (*it);
+ S32 size = segment.size();
+ U8* data = segment.data();
+ if((data + size) == mLastWritten)
+ {
+ ++it;
+ segment = (*it);
+ }
+ else
+ {
+ // *FIX: check the math on this one
+ segment = LLSegment(
+ (*it).getChannelMask(),
+ mLastWritten + 1,
+ size - (mLastWritten - data));
+ }
+ }
+ */
+
+ PUMP_DEBUG;
+ apr_size_t len;
+ bool done = false;
+ apr_status_t status = APR_SUCCESS;
+ while(it != end)
+ {
+
+ PUMP_DEBUG;
+ if((*it).isOnChannel(channels.in()))
+ {
+ PUMP_DEBUG;
+ len = (apr_size_t)segment.size();
+ status = apr_socket_send(
+ mDestination->getSocket(),
+ (const char*)segment.data(),
+ &len);
+ // We sometimes get a 'non-blocking socket operation could not be
+ // completed immediately' error from apr_socket_send. In this
+ // case we break and the data will be sent the next time the chain
+ // is pumped.
+ if(APR_STATUS_IS_EAGAIN(status))
+ {
+ ll_apr_warn_status(status);
+ break;
+ }
+
+ mLastWritten = segment.data() + len - 1;
+
+ PUMP_DEBUG;
+ if((S32)len < segment.size())
+ {
+ break;
+ }
+
+ }
+
+ ++it;
+ if(it != end)
+ {
+ segment = (*it);
+ }
+ else
+ {
+ done = true;
+ }
+
+ }
+ buffer->unlock();
+
+ PUMP_DEBUG;
+ if(done && eos)
+ {
+ return STATUS_DONE;
+ }
+ return STATUS_OK;
}
@@ -532,166 +532,166 @@ LLIOPipe::EStatus LLIOSocketWriter::process_impl(
///
LLIOServerSocket::LLIOServerSocket(
- apr_pool_t* pool,
- LLIOServerSocket::socket_t listener,
- factory_t factory) :
- mPool(pool),
- mListenSocket(listener),
- mReactor(factory),
- mInitialized(false),
- mResponseTimeout(DEFAULT_CHAIN_EXPIRY_SECS)
+ apr_pool_t* pool,
+ LLIOServerSocket::socket_t listener,
+ factory_t factory) :
+ mPool(pool),
+ mListenSocket(listener),
+ mReactor(factory),
+ mInitialized(false),
+ mResponseTimeout(DEFAULT_CHAIN_EXPIRY_SECS)
{
}
LLIOServerSocket::~LLIOServerSocket()
{
- //LL_DEBUGS() << "Destroying LLIOServerSocket" << LL_ENDL;
+ //LL_DEBUGS() << "Destroying LLIOServerSocket" << LL_ENDL;
}
void LLIOServerSocket::setResponseTimeout(F32 timeout_secs)
{
- mResponseTimeout = timeout_secs;
+ mResponseTimeout = timeout_secs;
}
// virtual
LLIOPipe::EStatus LLIOServerSocket::process_impl(
- const LLChannelDescriptors& channels,
- buffer_ptr_t& buffer,
- bool& eos,
- LLSD& context,
- LLPumpIO* pump)
+ const LLChannelDescriptors& channels,
+ buffer_ptr_t& buffer,
+ bool& eos,
+ LLSD& context,
+ LLPumpIO* pump)
{
LL_PROFILE_ZONE_SCOPED;
- PUMP_DEBUG;
- if(!pump)
- {
- LL_WARNS() << "Need a pump for server socket." << LL_ENDL;
- return STATUS_ERROR;
- }
- if(!mInitialized)
- {
- PUMP_DEBUG;
- // This segment sets up the pump so that we do not call
- // process again until we have an incoming read, aka connect()
- // from a remote host.
- LL_DEBUGS() << "Initializing poll descriptor for LLIOServerSocket."
- << LL_ENDL;
- apr_pollfd_t poll_fd;
- poll_fd.p = NULL;
- poll_fd.desc_type = APR_POLL_SOCKET;
- poll_fd.reqevents = APR_POLLIN;
- poll_fd.rtnevents = 0x0;
- poll_fd.desc.s = mListenSocket->getSocket();
- poll_fd.client_data = NULL;
- pump->setConditional(this, &poll_fd);
- mInitialized = true;
- return STATUS_OK;
- }
-
- // we are initialized, and told to process, so we must have a
- // socket waiting for a connection.
- LL_DEBUGS() << "accepting socket" << LL_ENDL;
-
- PUMP_DEBUG;
- apr_pool_t* new_pool = NULL;
- apr_status_t status = apr_pool_create(&new_pool, mPool);
- if(ll_apr_warn_status(status))
- {
- if(new_pool)
- {
- apr_pool_destroy(new_pool);
- }
- return STATUS_ERROR;
- }
-
- apr_socket_t* socket = NULL;
- status = apr_socket_accept(
- &socket,
- mListenSocket->getSocket(),
- new_pool);
- LLSocket::ptr_t llsocket(LLSocket::create(socket, new_pool));
- //EStatus rv = STATUS_ERROR;
- if(llsocket)
- {
- PUMP_DEBUG;
-
- apr_sockaddr_t* remote_addr;
- apr_socket_addr_get(&remote_addr, APR_REMOTE, socket);
-
- char* remote_host_string;
- apr_sockaddr_ip_get(&remote_host_string, remote_addr);
-
- LLSD context;
- context[CONTEXT_REMOTE_HOST] = remote_host_string;
- context[CONTEXT_REMOTE_PORT] = remote_addr->port;
-
- LLPumpIO::chain_t chain;
- chain.push_back(LLIOPipe::ptr_t(new LLIOSocketReader(llsocket)));
- if(mReactor->build(chain, context))
- {
- chain.push_back(LLIOPipe::ptr_t(new LLIOSocketWriter(llsocket)));
- pump->addChain(chain, mResponseTimeout);
- status = STATUS_OK;
- }
- else
- {
- LL_WARNS() << "Unable to build reactor to socket." << LL_ENDL;
- }
- }
- else
- {
- LL_WARNS() << "Unable to create linden socket." << LL_ENDL;
- }
-
- PUMP_DEBUG;
- // This needs to always return success, lest it get removed from
- // the pump.
- return STATUS_OK;
+ PUMP_DEBUG;
+ if(!pump)
+ {
+ LL_WARNS() << "Need a pump for server socket." << LL_ENDL;
+ return STATUS_ERROR;
+ }
+ if(!mInitialized)
+ {
+ PUMP_DEBUG;
+ // This segment sets up the pump so that we do not call
+ // process again until we have an incoming read, aka connect()
+ // from a remote host.
+ LL_DEBUGS() << "Initializing poll descriptor for LLIOServerSocket."
+ << LL_ENDL;
+ apr_pollfd_t poll_fd;
+ poll_fd.p = NULL;
+ poll_fd.desc_type = APR_POLL_SOCKET;
+ poll_fd.reqevents = APR_POLLIN;
+ poll_fd.rtnevents = 0x0;
+ poll_fd.desc.s = mListenSocket->getSocket();
+ poll_fd.client_data = NULL;
+ pump->setConditional(this, &poll_fd);
+ mInitialized = true;
+ return STATUS_OK;
+ }
+
+ // we are initialized, and told to process, so we must have a
+ // socket waiting for a connection.
+ LL_DEBUGS() << "accepting socket" << LL_ENDL;
+
+ PUMP_DEBUG;
+ apr_pool_t* new_pool = NULL;
+ apr_status_t status = apr_pool_create(&new_pool, mPool);
+ if(ll_apr_warn_status(status))
+ {
+ if(new_pool)
+ {
+ apr_pool_destroy(new_pool);
+ }
+ return STATUS_ERROR;
+ }
+
+ apr_socket_t* socket = NULL;
+ status = apr_socket_accept(
+ &socket,
+ mListenSocket->getSocket(),
+ new_pool);
+ LLSocket::ptr_t llsocket(LLSocket::create(socket, new_pool));
+ //EStatus rv = STATUS_ERROR;
+ if(llsocket)
+ {
+ PUMP_DEBUG;
+
+ apr_sockaddr_t* remote_addr;
+ apr_socket_addr_get(&remote_addr, APR_REMOTE, socket);
+
+ char* remote_host_string;
+ apr_sockaddr_ip_get(&remote_host_string, remote_addr);
+
+ LLSD context;
+ context[CONTEXT_REMOTE_HOST] = remote_host_string;
+ context[CONTEXT_REMOTE_PORT] = remote_addr->port;
+
+ LLPumpIO::chain_t chain;
+ chain.push_back(LLIOPipe::ptr_t(new LLIOSocketReader(llsocket)));
+ if(mReactor->build(chain, context))
+ {
+ chain.push_back(LLIOPipe::ptr_t(new LLIOSocketWriter(llsocket)));
+ pump->addChain(chain, mResponseTimeout);
+ status = STATUS_OK;
+ }
+ else
+ {
+ LL_WARNS() << "Unable to build reactor to socket." << LL_ENDL;
+ }
+ }
+ else
+ {
+ LL_WARNS() << "Unable to create linden socket." << LL_ENDL;
+ }
+
+ PUMP_DEBUG;
+ // This needs to always return success, lest it get removed from
+ // the pump.
+ return STATUS_OK;
}
#if 0
LLIODataSocket::LLIODataSocket(
- U16 suggested_port,
- U16 start_discovery_port,
- apr_pool_t* pool) :
- mSocket(NULL)
+ U16 suggested_port,
+ U16 start_discovery_port,
+ apr_pool_t* pool) :
+ mSocket(NULL)
{
- if(!pool || (PORT_INVALID == suggested_port)) return;
- if(ll_apr_warn_status(apr_socket_create(&mSocket, APR_INET, SOCK_DGRAM, APR_PROTO_UDP, pool))) return;
- apr_sockaddr_t* sa = NULL;
- if(ll_apr_warn_status(apr_sockaddr_info_get(&sa, APR_ANYADDR, APR_UNSPEC, suggested_port, 0, pool))) return;
- apr_status_t status = apr_socket_bind(mSocket, sa);
- if((start_discovery_port > 0) && is_addr_in_use(status))
- {
- const U16 MAX_ATTEMPT_PORTS = 50;
- for(U16 attempt_port = start_discovery_port;
- attempt_port < (start_discovery_port + MAX_ATTEMPT_PORTS);
- ++attempt_port)
- {
- sa->port = attempt_port;
- sa->sa.sin.sin_port = htons(attempt_port);
- status = apr_socket_bind(mSocket, sa);
- if(APR_SUCCESS == status) break;
- if(is_addr_in_use(status)) continue;
- (void)ll_apr_warn_status(status);
- }
- }
- if(ll_apr_warn_status(status)) return;
- if(sa->port)
- {
- LL_DEBUGS() << "Bound datagram socket to port: " << sa->port << LL_ENDL;
- mPort = sa->port;
- }
- else
- {
- mPort = LLIOSocket::PORT_EPHEMERAL;
- }
-
- // set up the socket options options
- ll_apr_warn_status(apr_socket_timeout_set(mSocket, 0));
- ll_apr_warn_status(apr_socket_opt_set(mSocket, APR_SO_SNDBUF, LL_SEND_BUFFER_SIZE));
- ll_apr_warn_status(apr_socket_opt_set(mSocket, APR_SO_RCVBUF, LL_RECV_BUFFER_SIZE));
+ if(!pool || (PORT_INVALID == suggested_port)) return;
+ if(ll_apr_warn_status(apr_socket_create(&mSocket, APR_INET, SOCK_DGRAM, APR_PROTO_UDP, pool))) return;
+ apr_sockaddr_t* sa = NULL;
+ if(ll_apr_warn_status(apr_sockaddr_info_get(&sa, APR_ANYADDR, APR_UNSPEC, suggested_port, 0, pool))) return;
+ apr_status_t status = apr_socket_bind(mSocket, sa);
+ if((start_discovery_port > 0) && is_addr_in_use(status))
+ {
+ const U16 MAX_ATTEMPT_PORTS = 50;
+ for(U16 attempt_port = start_discovery_port;
+ attempt_port < (start_discovery_port + MAX_ATTEMPT_PORTS);
+ ++attempt_port)
+ {
+ sa->port = attempt_port;
+ sa->sa.sin.sin_port = htons(attempt_port);
+ status = apr_socket_bind(mSocket, sa);
+ if(APR_SUCCESS == status) break;
+ if(is_addr_in_use(status)) continue;
+ (void)ll_apr_warn_status(status);
+ }
+ }
+ if(ll_apr_warn_status(status)) return;
+ if(sa->port)
+ {
+ LL_DEBUGS() << "Bound datagram socket to port: " << sa->port << LL_ENDL;
+ mPort = sa->port;
+ }
+ else
+ {
+ mPort = LLIOSocket::PORT_EPHEMERAL;
+ }
+
+ // set up the socket options options
+ ll_apr_warn_status(apr_socket_timeout_set(mSocket, 0));
+ ll_apr_warn_status(apr_socket_opt_set(mSocket, APR_SO_SNDBUF, LL_SEND_BUFFER_SIZE));
+ ll_apr_warn_status(apr_socket_opt_set(mSocket, APR_SO_RCVBUF, LL_RECV_BUFFER_SIZE));
}
LLIODataSocket::~LLIODataSocket()