diff options
author | Andrey Lihatskiy <alihatskiy@productengine.com> | 2024-05-15 11:16:27 +0300 |
---|---|---|
committer | Andrey Lihatskiy <alihatskiy@productengine.com> | 2024-05-15 11:16:27 +0300 |
commit | bccc10db9a90d365c353baebf443fde2030ce970 (patch) | |
tree | 2c2e1fd94b29667a809f8d7285d049f5ff5d424d /indra/llmessage/lliosocket.cpp | |
parent | 531cd34f670170ade57f8813fe48012b61a1d3c2 (diff) | |
parent | bb3c36f5cbc0c3b542045fd27255eee24e03da22 (diff) |
Merge branch 'main' into marchcat/x-b-merge
# Conflicts:
# autobuild.xml
# indra/cmake/ConfigurePkgConfig.cmake
# indra/cmake/ICU4C.cmake
# indra/media_plugins/gstreamer010/llmediaimplgstreamer_syms.cpp
# indra/media_plugins/gstreamer010/llmediaimplgstreamer_syms.h
# indra/media_plugins/gstreamer010/llmediaimplgstreamertriviallogging.h
# indra/media_plugins/gstreamer010/llmediaimplgstreamervidplug.cpp
# indra/media_plugins/gstreamer010/llmediaimplgstreamervidplug.h
# indra/media_plugins/gstreamer010/media_plugin_gstreamer010.cpp
# indra/newview/llappviewerlinux_api.h
# indra/newview/llappviewerlinux_api_dbus.cpp
# indra/newview/llappviewerlinux_api_dbus.h
# indra/newview/llfloateremojipicker.cpp
# indra/newview/lloutfitslist.cpp
Diffstat (limited to 'indra/llmessage/lliosocket.cpp')
-rw-r--r-- | indra/llmessage/lliosocket.cpp | 1034 |
1 files changed, 517 insertions, 517 deletions
diff --git a/indra/llmessage/lliosocket.cpp b/indra/llmessage/lliosocket.cpp index 321d7286eb..a14d10fe5f 100644 --- a/indra/llmessage/lliosocket.cpp +++ b/indra/llmessage/lliosocket.cpp @@ -1,4 +1,4 @@ -/** +/** * @file lliosocket.cpp * @author Phoenix * @date 2005-07-31 @@ -7,21 +7,21 @@ * $LicenseInfo:firstyear=2005&license=viewerlgpl$ * Second Life Viewer Source Code * Copyright (C) 2010, Linden Research, Inc. - * + * * This library is free software; you can redistribute it and/or * modify it under the terms of the GNU Lesser General Public * License as published by the Free Software Foundation; * version 2.1 of the License only. - * + * * This library is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU * Lesser General Public License for more details. - * + * * You should have received a copy of the GNU Lesser General Public * License along with this library; if not, write to the Free Software * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA - * + * * Linden Research, Inc., 945 Battery Street, San Francisco, CA 94111 USA * $/LicenseInfo$ */ @@ -50,15 +50,15 @@ static const S32 LL_RECV_BUFFER_SIZE = 40000; //static const U16 LL_PORT_DISCOVERY_RANGE_MAX = 13050; // -// local methods +// local methods // bool is_addr_in_use(apr_status_t status) { #if LL_WINDOWS - return (WSAEADDRINUSE == APR_TO_OS_ERROR(status)); + return (WSAEADDRINUSE == APR_TO_OS_ERROR(status)); #else - return (EADDRINUSE == APR_TO_OS_ERROR(status)); + return (EADDRINUSE == APR_TO_OS_ERROR(status)); #endif } @@ -71,28 +71,28 @@ bool is_addr_in_use(apr_status_t status) #endif -// Quick function +// Quick function void ll_debug_socket(const char* msg, apr_socket_t* apr_sock) { #if LL_DEBUG_SOCKET_FILE_DESCRIPTORS - if(!apr_sock) - { - LL_DEBUGS("Socket") << "Socket -- " << (msg?msg:"") << ": no socket." << LL_ENDL; - return; - } - // *TODO: Why doesn't this work? - //apr_os_sock_t os_sock; - int os_sock; - if(APR_SUCCESS == apr_os_sock_get(&os_sock, apr_sock)) - { - LL_DEBUGS("Socket") << "Socket -- " << (msg?msg:"") << " on fd " << os_sock - << " at " << apr_sock << LL_ENDL; - } - else - { - LL_DEBUGS("Socket") << "Socket -- " << (msg?msg:"") << " no fd " - << " at " << apr_sock << LL_ENDL; - } + if(!apr_sock) + { + LL_DEBUGS("Socket") << "Socket -- " << (msg?msg:"") << ": no socket." << LL_ENDL; + return; + } + // *TODO: Why doesn't this work? + //apr_os_sock_t os_sock; + int os_sock; + if(APR_SUCCESS == apr_os_sock_get(&os_sock, apr_sock)) + { + LL_DEBUGS("Socket") << "Socket -- " << (msg?msg:"") << " on fd " << os_sock + << " at " << apr_sock << LL_ENDL; + } + else + { + LL_DEBUGS("Socket") << "Socket -- " << (msg?msg:"") << " no fd " + << " at " << apr_sock << LL_ENDL; + } #endif } @@ -103,162 +103,162 @@ void ll_debug_socket(const char* msg, apr_socket_t* apr_sock) // static LLSocket::ptr_t LLSocket::create(apr_pool_t* pool, EType type, U16 port, const char *hostname) { - LLSocket::ptr_t rv; - apr_socket_t* socket = NULL; - apr_pool_t* new_pool = NULL; - apr_status_t status = APR_EGENERAL; - - // create a pool for the socket - status = apr_pool_create(&new_pool, pool); - if(ll_apr_warn_status(status)) - { - if(new_pool) apr_pool_destroy(new_pool); - return rv; - } - - if(STREAM_TCP == type) - { - status = apr_socket_create( - &socket, - APR_INET, - SOCK_STREAM, - APR_PROTO_TCP, - new_pool); - } - else if(DATAGRAM_UDP == type) - { - status = apr_socket_create( - &socket, - APR_INET, - SOCK_DGRAM, - APR_PROTO_UDP, - new_pool); - } - else - { - if(new_pool) apr_pool_destroy(new_pool); - return rv; - } - if(ll_apr_warn_status(status)) - { - if(new_pool) apr_pool_destroy(new_pool); - return rv; - } - // At this point, the new LLSocket instance takes ownership of new_pool, - // which is why no early return below this call explicitly destroys it: it - // is instead cleaned up by ~LLSocket(). - rv = ptr_t(new LLSocket(socket, new_pool)); - if(port > 0) - { - apr_sockaddr_t* sa = NULL; - status = apr_sockaddr_info_get( - &sa, - hostname, - APR_UNSPEC, - port, - 0, - new_pool); - if(ll_apr_warn_status(status)) - { - rv.reset(); - return rv; - } - // This allows us to reuse the address on quick down/up. This - // is unlikely to create problems. - ll_apr_warn_status(apr_socket_opt_set(socket, APR_SO_REUSEADDR, 1)); - status = apr_socket_bind(socket, sa); - if(ll_apr_warn_status(status)) - { - rv.reset(); - return rv; - } - LL_DEBUGS() << "Bound " << ((DATAGRAM_UDP == type) ? "udp" : "tcp") - << " socket to port: " << sa->port << LL_ENDL; - if(STREAM_TCP == type) - { - // If it's a stream based socket, we need to tell the OS - // to keep a queue of incoming connections for ACCEPT. - LL_DEBUGS() << "Setting listen state for socket." << LL_ENDL; - status = apr_socket_listen( - socket, - LL_DEFAULT_LISTEN_BACKLOG); - if(ll_apr_warn_status(status)) - { - rv.reset(); - return rv; - } - } - } - else // port <= 0 - { - // we need to indicate that we have an ephemeral port if the - // previous calls were successful. It will - port = PORT_EPHEMERAL; - } - rv->mPort = port; - rv->setNonBlocking(); - return rv; + LLSocket::ptr_t rv; + apr_socket_t* socket = NULL; + apr_pool_t* new_pool = NULL; + apr_status_t status = APR_EGENERAL; + + // create a pool for the socket + status = apr_pool_create(&new_pool, pool); + if(ll_apr_warn_status(status)) + { + if(new_pool) apr_pool_destroy(new_pool); + return rv; + } + + if(STREAM_TCP == type) + { + status = apr_socket_create( + &socket, + APR_INET, + SOCK_STREAM, + APR_PROTO_TCP, + new_pool); + } + else if(DATAGRAM_UDP == type) + { + status = apr_socket_create( + &socket, + APR_INET, + SOCK_DGRAM, + APR_PROTO_UDP, + new_pool); + } + else + { + if(new_pool) apr_pool_destroy(new_pool); + return rv; + } + if(ll_apr_warn_status(status)) + { + if(new_pool) apr_pool_destroy(new_pool); + return rv; + } + // At this point, the new LLSocket instance takes ownership of new_pool, + // which is why no early return below this call explicitly destroys it: it + // is instead cleaned up by ~LLSocket(). + rv = ptr_t(new LLSocket(socket, new_pool)); + if(port > 0) + { + apr_sockaddr_t* sa = NULL; + status = apr_sockaddr_info_get( + &sa, + hostname, + APR_UNSPEC, + port, + 0, + new_pool); + if(ll_apr_warn_status(status)) + { + rv.reset(); + return rv; + } + // This allows us to reuse the address on quick down/up. This + // is unlikely to create problems. + ll_apr_warn_status(apr_socket_opt_set(socket, APR_SO_REUSEADDR, 1)); + status = apr_socket_bind(socket, sa); + if(ll_apr_warn_status(status)) + { + rv.reset(); + return rv; + } + LL_DEBUGS() << "Bound " << ((DATAGRAM_UDP == type) ? "udp" : "tcp") + << " socket to port: " << sa->port << LL_ENDL; + if(STREAM_TCP == type) + { + // If it's a stream based socket, we need to tell the OS + // to keep a queue of incoming connections for ACCEPT. + LL_DEBUGS() << "Setting listen state for socket." << LL_ENDL; + status = apr_socket_listen( + socket, + LL_DEFAULT_LISTEN_BACKLOG); + if(ll_apr_warn_status(status)) + { + rv.reset(); + return rv; + } + } + } + else // port <= 0 + { + // we need to indicate that we have an ephemeral port if the + // previous calls were successful. It will + port = PORT_EPHEMERAL; + } + rv->mPort = port; + rv->setNonBlocking(); + return rv; } // static LLSocket::ptr_t LLSocket::create(apr_socket_t* socket, apr_pool_t* pool) { - LLSocket::ptr_t rv; - if(!socket) - { - return rv; - } - rv = ptr_t(new LLSocket(socket, pool)); - rv->mPort = PORT_EPHEMERAL; - rv->setNonBlocking(); - return rv; + LLSocket::ptr_t rv; + if(!socket) + { + return rv; + } + rv = ptr_t(new LLSocket(socket, pool)); + rv->mPort = PORT_EPHEMERAL; + rv->setNonBlocking(); + return rv; } bool LLSocket::blockingConnect(const LLHost& host) { - if(!mSocket) return false; - apr_sockaddr_t* sa = NULL; - std::string ip_address; - ip_address = host.getIPString(); - if(ll_apr_warn_status(apr_sockaddr_info_get( - &sa, - ip_address.c_str(), - APR_UNSPEC, - host.getPort(), - 0, - mPool))) - { - return false; - } - setBlocking(1000); - ll_debug_socket("Blocking connect", mSocket); - if(ll_apr_warn_status(apr_socket_connect(mSocket, sa))) return false; - setNonBlocking(); - return true; + if(!mSocket) return false; + apr_sockaddr_t* sa = NULL; + std::string ip_address; + ip_address = host.getIPString(); + if(ll_apr_warn_status(apr_sockaddr_info_get( + &sa, + ip_address.c_str(), + APR_UNSPEC, + host.getPort(), + 0, + mPool))) + { + return false; + } + setBlocking(1000); + ll_debug_socket("Blocking connect", mSocket); + if(ll_apr_warn_status(apr_socket_connect(mSocket, sa))) return false; + setNonBlocking(); + return true; } LLSocket::LLSocket(apr_socket_t* socket, apr_pool_t* pool) : - mSocket(socket), - mPool(pool), - mPort(PORT_INVALID) + mSocket(socket), + mPool(pool), + mPort(PORT_INVALID) { - ll_debug_socket("Constructing wholely formed socket", mSocket); + ll_debug_socket("Constructing wholely formed socket", mSocket); } LLSocket::~LLSocket() { - // *FIX: clean up memory we are holding. - if(mSocket) - { - ll_debug_socket("Destroying socket", mSocket); - apr_socket_close(mSocket); - mSocket = NULL; - } - if(mPool) - { - apr_pool_destroy(mPool); - } + // *FIX: clean up memory we are holding. + if(mSocket) + { + ll_debug_socket("Destroying socket", mSocket); + apr_socket_close(mSocket); + mSocket = NULL; + } + if(mPool) + { + apr_pool_destroy(mPool); + } } // See http://dev.ariel-networks.com/apr/apr-tutorial/html/apr-tutorial-13.html#ss13.4 @@ -267,21 +267,21 @@ LLSocket::~LLSocket() void LLSocket::setBlocking(S32 timeout) { - // set up the socket options - ll_apr_warn_status(apr_socket_timeout_set(mSocket, timeout)); // Sets both receive and send timeout SO_RCVTIMEO, SO_SNDTIMEO - ll_apr_warn_status(apr_socket_opt_set(mSocket, APR_SO_NONBLOCK, 0)); - ll_apr_warn_status(apr_socket_opt_set(mSocket, APR_SO_SNDBUF, LL_SEND_BUFFER_SIZE)); - ll_apr_warn_status(apr_socket_opt_set(mSocket, APR_SO_RCVBUF, LL_RECV_BUFFER_SIZE)); + // set up the socket options + ll_apr_warn_status(apr_socket_timeout_set(mSocket, timeout)); // Sets both receive and send timeout SO_RCVTIMEO, SO_SNDTIMEO + ll_apr_warn_status(apr_socket_opt_set(mSocket, APR_SO_NONBLOCK, 0)); + ll_apr_warn_status(apr_socket_opt_set(mSocket, APR_SO_SNDBUF, LL_SEND_BUFFER_SIZE)); + ll_apr_warn_status(apr_socket_opt_set(mSocket, APR_SO_RCVBUF, LL_RECV_BUFFER_SIZE)); } void LLSocket::setNonBlocking() { - // set up the socket options - ll_apr_warn_status(apr_socket_timeout_set(mSocket, 0)); - ll_apr_warn_status(apr_socket_opt_set(mSocket, APR_SO_NONBLOCK, 1)); - ll_apr_warn_status(apr_socket_opt_set(mSocket, APR_SO_SNDBUF, LL_SEND_BUFFER_SIZE)); - ll_apr_warn_status(apr_socket_opt_set(mSocket, APR_SO_RCVBUF, LL_RECV_BUFFER_SIZE)); + // set up the socket options + ll_apr_warn_status(apr_socket_timeout_set(mSocket, 0)); + ll_apr_warn_status(apr_socket_opt_set(mSocket, APR_SO_NONBLOCK, 1)); + ll_apr_warn_status(apr_socket_opt_set(mSocket, APR_SO_SNDBUF, LL_SEND_BUFFER_SIZE)); + ll_apr_warn_status(apr_socket_opt_set(mSocket, APR_SO_RCVBUF, LL_RECV_BUFFER_SIZE)); } @@ -290,96 +290,96 @@ void LLSocket::setNonBlocking() /// LLIOSocketReader::LLIOSocketReader(LLSocket::ptr_t socket) : - mSource(socket), - mInitialized(false) + mSource(socket), + mInitialized(false) { } LLIOSocketReader::~LLIOSocketReader() { - //LL_DEBUGS() << "Destroying LLIOSocketReader" << LL_ENDL; + //LL_DEBUGS() << "Destroying LLIOSocketReader" << LL_ENDL; } // virtual LLIOPipe::EStatus LLIOSocketReader::process_impl( - const LLChannelDescriptors& channels, - buffer_ptr_t& buffer, - bool& eos, - LLSD& context, - LLPumpIO* pump) + const LLChannelDescriptors& channels, + buffer_ptr_t& buffer, + bool& eos, + LLSD& context, + LLPumpIO* pump) { LL_PROFILE_ZONE_SCOPED; - PUMP_DEBUG; - if(!mSource) return STATUS_PRECONDITION_NOT_MET; - if(!mInitialized) - { - PUMP_DEBUG; - // Since the read will not block, it's ok to initialize and - // attempt to read off the descriptor immediately. - mInitialized = true; - if(pump) - { - PUMP_DEBUG; - LL_DEBUGS() << "Initializing poll descriptor for LLIOSocketReader." - << LL_ENDL; - apr_pollfd_t poll_fd; - poll_fd.p = NULL; - poll_fd.desc_type = APR_POLL_SOCKET; - poll_fd.reqevents = APR_POLLIN; - poll_fd.rtnevents = 0x0; - poll_fd.desc.s = mSource->getSocket(); - poll_fd.client_data = NULL; - pump->setConditional(this, &poll_fd); - } - } - //if(!buffer) - //{ - // buffer = new LLBufferArray; - //} - PUMP_DEBUG; - const apr_size_t READ_BUFFER_SIZE = 1024; - char read_buf[READ_BUFFER_SIZE]; /*Flawfinder: ignore*/ - apr_size_t len; - apr_status_t status = APR_SUCCESS; - do - { - PUMP_DEBUG; - len = READ_BUFFER_SIZE; - status = apr_socket_recv(mSource->getSocket(), read_buf, &len); - buffer->append(channels.out(), (U8*)read_buf, len); - } while((APR_SUCCESS == status) && (READ_BUFFER_SIZE == len)); - LL_DEBUGS() << "socket read status: " << status << LL_ENDL; - LLIOPipe::EStatus rv = STATUS_OK; - - PUMP_DEBUG; - // *FIX: Also need to check for broken pipe - if(APR_STATUS_IS_EOF(status)) - { - // *FIX: Should we shut down the socket read? - if(pump) - { - pump->setConditional(this, NULL); - } - rv = STATUS_DONE; - eos = true; - } - else if(APR_STATUS_IS_EAGAIN(status)) - { + PUMP_DEBUG; + if(!mSource) return STATUS_PRECONDITION_NOT_MET; + if(!mInitialized) + { + PUMP_DEBUG; + // Since the read will not block, it's ok to initialize and + // attempt to read off the descriptor immediately. + mInitialized = true; + if(pump) + { + PUMP_DEBUG; + LL_DEBUGS() << "Initializing poll descriptor for LLIOSocketReader." + << LL_ENDL; + apr_pollfd_t poll_fd; + poll_fd.p = NULL; + poll_fd.desc_type = APR_POLL_SOCKET; + poll_fd.reqevents = APR_POLLIN; + poll_fd.rtnevents = 0x0; + poll_fd.desc.s = mSource->getSocket(); + poll_fd.client_data = NULL; + pump->setConditional(this, &poll_fd); + } + } + //if(!buffer) + //{ + // buffer = new LLBufferArray; + //} + PUMP_DEBUG; + const apr_size_t READ_BUFFER_SIZE = 1024; + char read_buf[READ_BUFFER_SIZE]; /*Flawfinder: ignore*/ + apr_size_t len; + apr_status_t status = APR_SUCCESS; + do + { + PUMP_DEBUG; + len = READ_BUFFER_SIZE; + status = apr_socket_recv(mSource->getSocket(), read_buf, &len); + buffer->append(channels.out(), (U8*)read_buf, len); + } while((APR_SUCCESS == status) && (READ_BUFFER_SIZE == len)); + LL_DEBUGS() << "socket read status: " << status << LL_ENDL; + LLIOPipe::EStatus rv = STATUS_OK; + + PUMP_DEBUG; + // *FIX: Also need to check for broken pipe + if(APR_STATUS_IS_EOF(status)) + { + // *FIX: Should we shut down the socket read? + if(pump) + { + pump->setConditional(this, NULL); + } + rv = STATUS_DONE; + eos = true; + } + else if(APR_STATUS_IS_EAGAIN(status)) + { /*Commented out by Aura 9-9-8 for DEV-19961. - // everything is fine, but we can terminate this process pump. - - rv = STATUS_BREAK; + // everything is fine, but we can terminate this process pump. + + rv = STATUS_BREAK; */ - } - else - { - if(ll_apr_warn_status(status)) - { - rv = STATUS_ERROR; - } - } - PUMP_DEBUG; - return rv; + } + else + { + if(ll_apr_warn_status(status)) + { + rv = STATUS_ERROR; + } + } + PUMP_DEBUG; + return rv; } /// @@ -387,143 +387,143 @@ LLIOPipe::EStatus LLIOSocketReader::process_impl( /// LLIOSocketWriter::LLIOSocketWriter(LLSocket::ptr_t socket) : - mDestination(socket), - mLastWritten(NULL), - mInitialized(false) + mDestination(socket), + mLastWritten(NULL), + mInitialized(false) { } LLIOSocketWriter::~LLIOSocketWriter() { - //LL_DEBUGS() << "Destroying LLIOSocketWriter" << LL_ENDL; + //LL_DEBUGS() << "Destroying LLIOSocketWriter" << LL_ENDL; } // virtual LLIOPipe::EStatus LLIOSocketWriter::process_impl( - const LLChannelDescriptors& channels, - buffer_ptr_t& buffer, - bool& eos, - LLSD& context, - LLPumpIO* pump) + const LLChannelDescriptors& channels, + buffer_ptr_t& buffer, + bool& eos, + LLSD& context, + LLPumpIO* pump) { LL_PROFILE_ZONE_SCOPED; - PUMP_DEBUG; - if(!mDestination) return STATUS_PRECONDITION_NOT_MET; - if(!mInitialized) - { - PUMP_DEBUG; - // Since the write will not block, it's ok to initialize and - // attempt to write immediately. - mInitialized = true; - if(pump) - { - PUMP_DEBUG; - LL_DEBUGS() << "Initializing poll descriptor for LLIOSocketWriter." - << LL_ENDL; - apr_pollfd_t poll_fd; - poll_fd.p = NULL; - poll_fd.desc_type = APR_POLL_SOCKET; - poll_fd.reqevents = APR_POLLOUT; - poll_fd.rtnevents = 0x0; - poll_fd.desc.s = mDestination->getSocket(); - poll_fd.client_data = NULL; - pump->setConditional(this, &poll_fd); - } - } - - PUMP_DEBUG; - // *FIX: Some sort of writev implementation would be much more - // efficient - not only because writev() is better, but also - // because we won't have to do as much work to find the start - // address. - buffer->lock(); - LLBufferArray::segment_iterator_t it; - LLBufferArray::segment_iterator_t end = buffer->endSegment(); - LLSegment segment; - it = buffer->constructSegmentAfter(mLastWritten, segment); - /* - if(NULL == mLastWritten) - { - it = buffer->beginSegment(); - segment = (*it); - } - else - { - it = buffer->getSegment(mLastWritten); - segment = (*it); - S32 size = segment.size(); - U8* data = segment.data(); - if((data + size) == mLastWritten) - { - ++it; - segment = (*it); - } - else - { - // *FIX: check the math on this one - segment = LLSegment( - (*it).getChannelMask(), - mLastWritten + 1, - size - (mLastWritten - data)); - } - } - */ - - PUMP_DEBUG; - apr_size_t len; - bool done = false; - apr_status_t status = APR_SUCCESS; - while(it != end) - { - - PUMP_DEBUG; - if((*it).isOnChannel(channels.in())) - { - PUMP_DEBUG; - len = (apr_size_t)segment.size(); - status = apr_socket_send( - mDestination->getSocket(), - (const char*)segment.data(), - &len); - // We sometimes get a 'non-blocking socket operation could not be - // completed immediately' error from apr_socket_send. In this - // case we break and the data will be sent the next time the chain - // is pumped. - if(APR_STATUS_IS_EAGAIN(status)) - { - ll_apr_warn_status(status); - break; - } - - mLastWritten = segment.data() + len - 1; - - PUMP_DEBUG; - if((S32)len < segment.size()) - { - break; - } - - } - - ++it; - if(it != end) - { - segment = (*it); - } - else - { - done = true; - } - - } - buffer->unlock(); - - PUMP_DEBUG; - if(done && eos) - { - return STATUS_DONE; - } - return STATUS_OK; + PUMP_DEBUG; + if(!mDestination) return STATUS_PRECONDITION_NOT_MET; + if(!mInitialized) + { + PUMP_DEBUG; + // Since the write will not block, it's ok to initialize and + // attempt to write immediately. + mInitialized = true; + if(pump) + { + PUMP_DEBUG; + LL_DEBUGS() << "Initializing poll descriptor for LLIOSocketWriter." + << LL_ENDL; + apr_pollfd_t poll_fd; + poll_fd.p = NULL; + poll_fd.desc_type = APR_POLL_SOCKET; + poll_fd.reqevents = APR_POLLOUT; + poll_fd.rtnevents = 0x0; + poll_fd.desc.s = mDestination->getSocket(); + poll_fd.client_data = NULL; + pump->setConditional(this, &poll_fd); + } + } + + PUMP_DEBUG; + // *FIX: Some sort of writev implementation would be much more + // efficient - not only because writev() is better, but also + // because we won't have to do as much work to find the start + // address. + buffer->lock(); + LLBufferArray::segment_iterator_t it; + LLBufferArray::segment_iterator_t end = buffer->endSegment(); + LLSegment segment; + it = buffer->constructSegmentAfter(mLastWritten, segment); + /* + if(NULL == mLastWritten) + { + it = buffer->beginSegment(); + segment = (*it); + } + else + { + it = buffer->getSegment(mLastWritten); + segment = (*it); + S32 size = segment.size(); + U8* data = segment.data(); + if((data + size) == mLastWritten) + { + ++it; + segment = (*it); + } + else + { + // *FIX: check the math on this one + segment = LLSegment( + (*it).getChannelMask(), + mLastWritten + 1, + size - (mLastWritten - data)); + } + } + */ + + PUMP_DEBUG; + apr_size_t len; + bool done = false; + apr_status_t status = APR_SUCCESS; + while(it != end) + { + + PUMP_DEBUG; + if((*it).isOnChannel(channels.in())) + { + PUMP_DEBUG; + len = (apr_size_t)segment.size(); + status = apr_socket_send( + mDestination->getSocket(), + (const char*)segment.data(), + &len); + // We sometimes get a 'non-blocking socket operation could not be + // completed immediately' error from apr_socket_send. In this + // case we break and the data will be sent the next time the chain + // is pumped. + if(APR_STATUS_IS_EAGAIN(status)) + { + ll_apr_warn_status(status); + break; + } + + mLastWritten = segment.data() + len - 1; + + PUMP_DEBUG; + if((S32)len < segment.size()) + { + break; + } + + } + + ++it; + if(it != end) + { + segment = (*it); + } + else + { + done = true; + } + + } + buffer->unlock(); + + PUMP_DEBUG; + if(done && eos) + { + return STATUS_DONE; + } + return STATUS_OK; } @@ -532,166 +532,166 @@ LLIOPipe::EStatus LLIOSocketWriter::process_impl( /// LLIOServerSocket::LLIOServerSocket( - apr_pool_t* pool, - LLIOServerSocket::socket_t listener, - factory_t factory) : - mPool(pool), - mListenSocket(listener), - mReactor(factory), - mInitialized(false), - mResponseTimeout(DEFAULT_CHAIN_EXPIRY_SECS) + apr_pool_t* pool, + LLIOServerSocket::socket_t listener, + factory_t factory) : + mPool(pool), + mListenSocket(listener), + mReactor(factory), + mInitialized(false), + mResponseTimeout(DEFAULT_CHAIN_EXPIRY_SECS) { } LLIOServerSocket::~LLIOServerSocket() { - //LL_DEBUGS() << "Destroying LLIOServerSocket" << LL_ENDL; + //LL_DEBUGS() << "Destroying LLIOServerSocket" << LL_ENDL; } void LLIOServerSocket::setResponseTimeout(F32 timeout_secs) { - mResponseTimeout = timeout_secs; + mResponseTimeout = timeout_secs; } // virtual LLIOPipe::EStatus LLIOServerSocket::process_impl( - const LLChannelDescriptors& channels, - buffer_ptr_t& buffer, - bool& eos, - LLSD& context, - LLPumpIO* pump) + const LLChannelDescriptors& channels, + buffer_ptr_t& buffer, + bool& eos, + LLSD& context, + LLPumpIO* pump) { LL_PROFILE_ZONE_SCOPED; - PUMP_DEBUG; - if(!pump) - { - LL_WARNS() << "Need a pump for server socket." << LL_ENDL; - return STATUS_ERROR; - } - if(!mInitialized) - { - PUMP_DEBUG; - // This segment sets up the pump so that we do not call - // process again until we have an incoming read, aka connect() - // from a remote host. - LL_DEBUGS() << "Initializing poll descriptor for LLIOServerSocket." - << LL_ENDL; - apr_pollfd_t poll_fd; - poll_fd.p = NULL; - poll_fd.desc_type = APR_POLL_SOCKET; - poll_fd.reqevents = APR_POLLIN; - poll_fd.rtnevents = 0x0; - poll_fd.desc.s = mListenSocket->getSocket(); - poll_fd.client_data = NULL; - pump->setConditional(this, &poll_fd); - mInitialized = true; - return STATUS_OK; - } - - // we are initialized, and told to process, so we must have a - // socket waiting for a connection. - LL_DEBUGS() << "accepting socket" << LL_ENDL; - - PUMP_DEBUG; - apr_pool_t* new_pool = NULL; - apr_status_t status = apr_pool_create(&new_pool, mPool); - if(ll_apr_warn_status(status)) - { - if(new_pool) - { - apr_pool_destroy(new_pool); - } - return STATUS_ERROR; - } - - apr_socket_t* socket = NULL; - status = apr_socket_accept( - &socket, - mListenSocket->getSocket(), - new_pool); - LLSocket::ptr_t llsocket(LLSocket::create(socket, new_pool)); - //EStatus rv = STATUS_ERROR; - if(llsocket) - { - PUMP_DEBUG; - - apr_sockaddr_t* remote_addr; - apr_socket_addr_get(&remote_addr, APR_REMOTE, socket); - - char* remote_host_string; - apr_sockaddr_ip_get(&remote_host_string, remote_addr); - - LLSD context; - context[CONTEXT_REMOTE_HOST] = remote_host_string; - context[CONTEXT_REMOTE_PORT] = remote_addr->port; - - LLPumpIO::chain_t chain; - chain.push_back(LLIOPipe::ptr_t(new LLIOSocketReader(llsocket))); - if(mReactor->build(chain, context)) - { - chain.push_back(LLIOPipe::ptr_t(new LLIOSocketWriter(llsocket))); - pump->addChain(chain, mResponseTimeout); - status = STATUS_OK; - } - else - { - LL_WARNS() << "Unable to build reactor to socket." << LL_ENDL; - } - } - else - { - LL_WARNS() << "Unable to create linden socket." << LL_ENDL; - } - - PUMP_DEBUG; - // This needs to always return success, lest it get removed from - // the pump. - return STATUS_OK; + PUMP_DEBUG; + if(!pump) + { + LL_WARNS() << "Need a pump for server socket." << LL_ENDL; + return STATUS_ERROR; + } + if(!mInitialized) + { + PUMP_DEBUG; + // This segment sets up the pump so that we do not call + // process again until we have an incoming read, aka connect() + // from a remote host. + LL_DEBUGS() << "Initializing poll descriptor for LLIOServerSocket." + << LL_ENDL; + apr_pollfd_t poll_fd; + poll_fd.p = NULL; + poll_fd.desc_type = APR_POLL_SOCKET; + poll_fd.reqevents = APR_POLLIN; + poll_fd.rtnevents = 0x0; + poll_fd.desc.s = mListenSocket->getSocket(); + poll_fd.client_data = NULL; + pump->setConditional(this, &poll_fd); + mInitialized = true; + return STATUS_OK; + } + + // we are initialized, and told to process, so we must have a + // socket waiting for a connection. + LL_DEBUGS() << "accepting socket" << LL_ENDL; + + PUMP_DEBUG; + apr_pool_t* new_pool = NULL; + apr_status_t status = apr_pool_create(&new_pool, mPool); + if(ll_apr_warn_status(status)) + { + if(new_pool) + { + apr_pool_destroy(new_pool); + } + return STATUS_ERROR; + } + + apr_socket_t* socket = NULL; + status = apr_socket_accept( + &socket, + mListenSocket->getSocket(), + new_pool); + LLSocket::ptr_t llsocket(LLSocket::create(socket, new_pool)); + //EStatus rv = STATUS_ERROR; + if(llsocket) + { + PUMP_DEBUG; + + apr_sockaddr_t* remote_addr; + apr_socket_addr_get(&remote_addr, APR_REMOTE, socket); + + char* remote_host_string; + apr_sockaddr_ip_get(&remote_host_string, remote_addr); + + LLSD context; + context[CONTEXT_REMOTE_HOST] = remote_host_string; + context[CONTEXT_REMOTE_PORT] = remote_addr->port; + + LLPumpIO::chain_t chain; + chain.push_back(LLIOPipe::ptr_t(new LLIOSocketReader(llsocket))); + if(mReactor->build(chain, context)) + { + chain.push_back(LLIOPipe::ptr_t(new LLIOSocketWriter(llsocket))); + pump->addChain(chain, mResponseTimeout); + status = STATUS_OK; + } + else + { + LL_WARNS() << "Unable to build reactor to socket." << LL_ENDL; + } + } + else + { + LL_WARNS() << "Unable to create linden socket." << LL_ENDL; + } + + PUMP_DEBUG; + // This needs to always return success, lest it get removed from + // the pump. + return STATUS_OK; } #if 0 LLIODataSocket::LLIODataSocket( - U16 suggested_port, - U16 start_discovery_port, - apr_pool_t* pool) : - mSocket(NULL) + U16 suggested_port, + U16 start_discovery_port, + apr_pool_t* pool) : + mSocket(NULL) { - if(!pool || (PORT_INVALID == suggested_port)) return; - if(ll_apr_warn_status(apr_socket_create(&mSocket, APR_INET, SOCK_DGRAM, APR_PROTO_UDP, pool))) return; - apr_sockaddr_t* sa = NULL; - if(ll_apr_warn_status(apr_sockaddr_info_get(&sa, APR_ANYADDR, APR_UNSPEC, suggested_port, 0, pool))) return; - apr_status_t status = apr_socket_bind(mSocket, sa); - if((start_discovery_port > 0) && is_addr_in_use(status)) - { - const U16 MAX_ATTEMPT_PORTS = 50; - for(U16 attempt_port = start_discovery_port; - attempt_port < (start_discovery_port + MAX_ATTEMPT_PORTS); - ++attempt_port) - { - sa->port = attempt_port; - sa->sa.sin.sin_port = htons(attempt_port); - status = apr_socket_bind(mSocket, sa); - if(APR_SUCCESS == status) break; - if(is_addr_in_use(status)) continue; - (void)ll_apr_warn_status(status); - } - } - if(ll_apr_warn_status(status)) return; - if(sa->port) - { - LL_DEBUGS() << "Bound datagram socket to port: " << sa->port << LL_ENDL; - mPort = sa->port; - } - else - { - mPort = LLIOSocket::PORT_EPHEMERAL; - } - - // set up the socket options options - ll_apr_warn_status(apr_socket_timeout_set(mSocket, 0)); - ll_apr_warn_status(apr_socket_opt_set(mSocket, APR_SO_SNDBUF, LL_SEND_BUFFER_SIZE)); - ll_apr_warn_status(apr_socket_opt_set(mSocket, APR_SO_RCVBUF, LL_RECV_BUFFER_SIZE)); + if(!pool || (PORT_INVALID == suggested_port)) return; + if(ll_apr_warn_status(apr_socket_create(&mSocket, APR_INET, SOCK_DGRAM, APR_PROTO_UDP, pool))) return; + apr_sockaddr_t* sa = NULL; + if(ll_apr_warn_status(apr_sockaddr_info_get(&sa, APR_ANYADDR, APR_UNSPEC, suggested_port, 0, pool))) return; + apr_status_t status = apr_socket_bind(mSocket, sa); + if((start_discovery_port > 0) && is_addr_in_use(status)) + { + const U16 MAX_ATTEMPT_PORTS = 50; + for(U16 attempt_port = start_discovery_port; + attempt_port < (start_discovery_port + MAX_ATTEMPT_PORTS); + ++attempt_port) + { + sa->port = attempt_port; + sa->sa.sin.sin_port = htons(attempt_port); + status = apr_socket_bind(mSocket, sa); + if(APR_SUCCESS == status) break; + if(is_addr_in_use(status)) continue; + (void)ll_apr_warn_status(status); + } + } + if(ll_apr_warn_status(status)) return; + if(sa->port) + { + LL_DEBUGS() << "Bound datagram socket to port: " << sa->port << LL_ENDL; + mPort = sa->port; + } + else + { + mPort = LLIOSocket::PORT_EPHEMERAL; + } + + // set up the socket options options + ll_apr_warn_status(apr_socket_timeout_set(mSocket, 0)); + ll_apr_warn_status(apr_socket_opt_set(mSocket, APR_SO_SNDBUF, LL_SEND_BUFFER_SIZE)); + ll_apr_warn_status(apr_socket_opt_set(mSocket, APR_SO_RCVBUF, LL_RECV_BUFFER_SIZE)); } LLIODataSocket::~LLIODataSocket() |