/** * @file lliosocket.cpp * @author Phoenix * @date 2005-07-31 * @brief Sockets declarations for use with the io pipes * * Copyright (c) 2005-$CurrentYear$, Linden Research, Inc. * $License$ */ #include "linden_common.h" #include "lliosocket.h" #include "llapr.h" #include "llbuffer.h" #include "llhost.h" #include "llmemtype.h" #include "llpumpio.h" // // constants // static const S32 LL_DEFAULT_LISTEN_BACKLOG = 10; static const S32 LL_SEND_BUFFER_SIZE = 40000; static const S32 LL_RECV_BUFFER_SIZE = 40000; //static const U16 LL_PORT_DISCOVERY_RANGE_MIN = 13000; //static const U16 LL_PORT_DISCOVERY_RANGE_MAX = 13050; // // local methods // bool is_addr_in_use(apr_status_t status) { #if LL_WINDOWS return (WSAEADDRINUSE == APR_TO_OS_ERROR(status)); #else return (EADDRINUSE == APR_TO_OS_ERROR(status)); #endif } /// /// LLSocket /// // static LLSocket::ptr_t LLSocket::create(apr_pool_t* pool, EType type, U16 port) { LLMemType m1(LLMemType::MTYPE_IO_TCP); LLSocket::ptr_t rv; apr_socket_t* socket = NULL; apr_pool_t* new_pool = NULL; apr_status_t status = APR_EGENERAL; // create a pool for the socket status = apr_pool_create(&new_pool, pool); if(ll_apr_warn_status(status)) { if(new_pool) apr_pool_destroy(new_pool); return rv; } if(STREAM_TCP == type) { status = apr_socket_create( &socket, APR_INET, SOCK_STREAM, APR_PROTO_TCP, new_pool); } else if(DATAGRAM_UDP == type) { status = apr_socket_create( &socket, APR_INET, SOCK_DGRAM, APR_PROTO_UDP, new_pool); } else { if(new_pool) apr_pool_destroy(new_pool); return rv; } if(ll_apr_warn_status(status)) { if(new_pool) apr_pool_destroy(new_pool); return rv; } rv = ptr_t(new LLSocket(socket, new_pool)); if(port > 0) { apr_sockaddr_t* sa = NULL; status = apr_sockaddr_info_get( &sa, APR_ANYADDR, APR_UNSPEC, port, 0, new_pool); if(ll_apr_warn_status(status)) { rv.reset(); return rv; } // This allows us to reuse the address on quick down/up. This // is unlikely to create problems. ll_apr_warn_status(apr_socket_opt_set(socket, APR_SO_REUSEADDR, 1)); status = apr_socket_bind(socket, sa); if(ll_apr_warn_status(status)) { rv.reset(); return rv; } lldebugs << "Bound " << ((DATAGRAM_UDP == type) ? "udp" : "tcp") << " socket to port: " << sa->port << llendl; if(STREAM_TCP == type) { // If it's a stream based socket, we need to tell the OS // to keep a queue of incoming connections for ACCEPT. lldebugs << "Setting listen state for socket." << llendl; status = apr_socket_listen( socket, LL_DEFAULT_LISTEN_BACKLOG); if(ll_apr_warn_status(status)) { rv.reset(); return rv; } } } else { // we need to indicate that we have an ephemeral port if the // previous calls were successful. It will port = PORT_EPHEMERAL; } rv->mPort = port; rv->setOptions(); return rv; } // static LLSocket::ptr_t LLSocket::create(apr_socket_t* socket, apr_pool_t* pool) { LLMemType m1(LLMemType::MTYPE_IO_TCP); LLSocket::ptr_t rv; if(!socket) { return rv; } rv = ptr_t(new LLSocket(socket, pool)); rv->mPort = PORT_EPHEMERAL; rv->setOptions(); return rv; } bool LLSocket::blockingConnect(const LLHost& host) { if(!mSocket) return false; apr_sockaddr_t* sa = NULL; char ip_address[MAXADDRSTR]; /*Flawfinder: ignore*/ host.getIPString(ip_address, MAXADDRSTR); if(ll_apr_warn_status(apr_sockaddr_info_get( &sa, ip_address, APR_UNSPEC, host.getPort(), 0, mPool))) { return false; } apr_socket_timeout_set(mSocket, 1000); if(ll_apr_warn_status(apr_socket_connect(mSocket, sa))) return false; setOptions(); return true; } LLSocket::LLSocket(apr_socket_t* socket, apr_pool_t* pool) : mSocket(socket), mPool(pool), mPort(PORT_INVALID) { LLMemType m1(LLMemType::MTYPE_IO_TCP); } LLSocket::~LLSocket() { LLMemType m1(LLMemType::MTYPE_IO_TCP); // *FIX: clean up memory we are holding. //lldebugs << "Destroying LLSocket" << llendl; if(mSocket) { apr_socket_close(mSocket); } if(mPool) { apr_pool_destroy(mPool); } } void LLSocket::setOptions() { LLMemType m1(LLMemType::MTYPE_IO_TCP); // set up the socket options ll_apr_warn_status(apr_socket_timeout_set(mSocket, 0)); ll_apr_warn_status(apr_socket_opt_set(mSocket, APR_SO_SNDBUF, LL_SEND_BUFFER_SIZE)); ll_apr_warn_status(apr_socket_opt_set(mSocket, APR_SO_RCVBUF, LL_RECV_BUFFER_SIZE)); } /// /// LLIOSocketReader /// LLIOSocketReader::LLIOSocketReader(LLSocket::ptr_t socket) : mSource(socket), mInitialized(false) { LLMemType m1(LLMemType::MTYPE_IO_TCP); } LLIOSocketReader::~LLIOSocketReader() { LLMemType m1(LLMemType::MTYPE_IO_TCP); //lldebugs << "Destroying LLIOSocketReader" << llendl; } // virtual LLIOPipe::EStatus LLIOSocketReader::process_impl( const LLChannelDescriptors& channels, buffer_ptr_t& buffer, bool& eos, LLSD& context, LLPumpIO* pump) { PUMP_DEBUG; LLMemType m1(LLMemType::MTYPE_IO_TCP); if(!mSource) return STATUS_PRECONDITION_NOT_MET; if(!mInitialized) { PUMP_DEBUG; // Since the read will not block, it's ok to initialize and // attempt to read off the descriptor immediately. mInitialized = true; if(pump) { PUMP_DEBUG; lldebugs << "Initializing poll descriptor for LLIOSocketReader." << llendl; apr_pollfd_t poll_fd; poll_fd.p = NULL; poll_fd.desc_type = APR_POLL_SOCKET; poll_fd.reqevents = APR_POLLIN; poll_fd.rtnevents = 0x0; poll_fd.desc.s = mSource->getSocket(); poll_fd.client_data = NULL; pump->setConditional(this, &poll_fd); } } //if(!buffer) //{ // buffer = new LLBufferArray; //} PUMP_DEBUG; const apr_size_t READ_BUFFER_SIZE = 1024; char read_buf[READ_BUFFER_SIZE]; /*Flawfinder: ignore*/ apr_size_t len; apr_status_t status = APR_SUCCESS; do { PUMP_DEBUG; len = READ_BUFFER_SIZE; status = apr_socket_recv(mSource->getSocket(), read_buf, &len); buffer->append(channels.out(), (U8*)read_buf, len); } while((APR_SUCCESS == status) && (READ_BUFFER_SIZE == len)); lldebugs << "socket read status: " << status << llendl; LLIOPipe::EStatus rv = STATUS_OK; PUMP_DEBUG; // *FIX: Also need to check for broken pipe if(APR_STATUS_IS_EOF(status)) { // *FIX: Should we shut down the socket read? if(pump) { pump->setConditional(this, NULL); } rv = STATUS_DONE; eos = true; } else if(APR_STATUS_IS_EAGAIN(status)) { // everything is fine, but we can terminate this process pump. rv = STATUS_BREAK; } else { if(ll_apr_warn_status(status)) { rv = STATUS_ERROR; } } PUMP_DEBUG; return rv; } /// /// LLIOSocketWriter /// LLIOSocketWriter::LLIOSocketWriter(LLSocket::ptr_t socket) : mDestination(socket), mLastWritten(NULL), mInitialized(false) { LLMemType m1(LLMemType::MTYPE_IO_TCP); } LLIOSocketWriter::~LLIOSocketWriter() { LLMemType m1(LLMemType::MTYPE_IO_TCP); //lldebugs << "Destroying LLIOSocketWriter" << llendl; } // virtual LLIOPipe::EStatus LLIOSocketWriter::process_impl( const LLChannelDescriptors& channels, buffer_ptr_t& buffer, bool& eos, LLSD& context, LLPumpIO* pump) { PUMP_DEBUG; LLMemType m1(LLMemType::MTYPE_IO_TCP); if(!mDestination) return STATUS_PRECONDITION_NOT_MET; if(!mInitialized) { PUMP_DEBUG; // Since the write will not block, it's ok to initialize and // attempt to write immediately. mInitialized = true; if(pump) { PUMP_DEBUG; lldebugs << "Initializing poll descriptor for LLIOSocketWriter." << llendl; apr_pollfd_t poll_fd; poll_fd.p = NULL; poll_fd.desc_type = APR_POLL_SOCKET; poll_fd.reqevents = APR_POLLOUT; poll_fd.rtnevents = 0x0; poll_fd.desc.s = mDestination->getSocket(); poll_fd.client_data = NULL; pump->setConditional(this, &poll_fd); } } PUMP_DEBUG; // *FIX: Some sort of writev implementation would be much more // efficient - not only because writev() is better, but also // because we won't have to do as much work to find the start // address. LLBufferArray::segment_iterator_t it; LLBufferArray::segment_iterator_t end = buffer->endSegment(); LLSegment segment; it = buffer->constructSegmentAfter(mLastWritten, segment); /* if(NULL == mLastWritten) { it = buffer->beginSegment(); segment = (*it); } else { it = buffer->getSegment(mLastWritten); segment = (*it); S32 size = segment.size(); U8* data = segment.data(); if((data + size) == mLastWritten) { ++it; segment = (*it); } else { // *FIX: check the math on this one segment = LLSegment( (*it).getChannelMask(), mLastWritten + 1, size - (mLastWritten - data)); } } */ PUMP_DEBUG; apr_size_t len; bool done = false; while(it != end) { PUMP_DEBUG; if((*it).isOnChannel(channels.in())) { PUMP_DEBUG; // *FIX: check return code - sockets will fail (broken, etc.) len = (apr_size_t)segment.size(); apr_socket_send( mDestination->getSocket(), (const char*)segment.data(), &len); mLastWritten = segment.data() + len - 1; PUMP_DEBUG; if((S32)len < segment.size()) { break; } } ++it; if(it != end) { segment = (*it); } else { done = true; } } PUMP_DEBUG; if(done && eos) { return STATUS_DONE; } return STATUS_OK; } /// /// LLIOServerSocket /// LLIOServerSocket::LLIOServerSocket( apr_pool_t* pool, LLIOServerSocket::socket_t listener, factory_t factory) : mPool(pool), mListenSocket(listener), mReactor(factory), mInitialized(false), mResponseTimeout(DEFAULT_CHAIN_EXPIRY_SECS) { LLMemType m1(LLMemType::MTYPE_IO_TCP); } LLIOServerSocket::~LLIOServerSocket() { LLMemType m1(LLMemType::MTYPE_IO_TCP); //lldebugs << "Destroying LLIOServerSocket" << llendl; } void LLIOServerSocket::setResponseTimeout(F32 timeout_secs) { mResponseTimeout = timeout_secs; } // virtual LLIOPipe::EStatus LLIOServerSocket::process_impl( const LLChannelDescriptors& channels, buffer_ptr_t& buffer, bool& eos, LLSD& context, LLPumpIO* pump) { PUMP_DEBUG; LLMemType m1(LLMemType::MTYPE_IO_TCP); if(!pump) { llwarns << "Need a pump for server socket." << llendl; return STATUS_ERROR; } if(!mInitialized) { PUMP_DEBUG; // This segment sets up the pump so that we do not call // process again until we have an incoming read, aka connect() // from a remote host. lldebugs << "Initializing poll descriptor for LLIOServerSocket." << llendl; apr_pollfd_t poll_fd; poll_fd.p = NULL; poll_fd.desc_type = APR_POLL_SOCKET; poll_fd.reqevents = APR_POLLIN; poll_fd.rtnevents = 0x0; poll_fd.desc.s = mListenSocket->getSocket(); poll_fd.client_data = NULL; pump->setConditional(this, &poll_fd); mInitialized = true; return STATUS_OK; } // we are initialized, and told to process, so we must have a // socket waiting for a connection. lldebugs << "accepting socket" << llendl; PUMP_DEBUG; apr_pool_t* new_pool = NULL; apr_status_t status = apr_pool_create(&new_pool, mPool); apr_socket_t* socket = NULL; status = apr_socket_accept( &socket, mListenSocket->getSocket(), new_pool); LLSocket::ptr_t llsocket(LLSocket::create(socket, new_pool)); //EStatus rv = STATUS_ERROR; if(llsocket) { PUMP_DEBUG; LLPumpIO::chain_t chain; chain.push_back(LLIOPipe::ptr_t(new LLIOSocketReader(llsocket))); if(mReactor->build(chain, LLSD())) { chain.push_back(LLIOPipe::ptr_t(new LLIOSocketWriter(llsocket))); pump->addChain(chain, mResponseTimeout); status = STATUS_OK; } else { llwarns << "Unable to build reactor to socket." << llendl; } } else { llwarns << "Unable to create linden socket." << llendl; } PUMP_DEBUG; // This needs to always return success, lest it get removed from // the pump. return STATUS_OK; } #if 0 LLIODataSocket::LLIODataSocket( U16 suggested_port, U16 start_discovery_port, apr_pool_t* pool) : mSocket(NULL) { if(!pool || (PORT_INVALID == suggested_port)) return; if(ll_apr_warn_status(apr_socket_create(&mSocket, APR_INET, SOCK_DGRAM, APR_PROTO_UDP, pool))) return; apr_sockaddr_t* sa = NULL; if(ll_apr_warn_status(apr_sockaddr_info_get(&sa, APR_ANYADDR, APR_UNSPEC, suggested_port, 0, pool))) return; apr_status_t status = apr_socket_bind(mSocket, sa); if((start_discovery_port > 0) && is_addr_in_use(status)) { const U16 MAX_ATTEMPT_PORTS = 50; for(U16 attempt_port = start_discovery_port; attempt_port < (start_discovery_port + MAX_ATTEMPT_PORTS); ++attempt_port) { sa->port = attempt_port; sa->sa.sin.sin_port = htons(attempt_port); status = apr_socket_bind(mSocket, sa); if(APR_SUCCESS == status) break; if(is_addr_in_use(status)) continue; (void)ll_apr_warn_status(status); } } if(ll_apr_warn_status(status)) return; if(sa->port) { lldebugs << "Bound datagram socket to port: " << sa->port << llendl; mPort = sa->port; } else { mPort = LLIOSocket::PORT_EPHEMERAL; } // set up the socket options options ll_apr_warn_status(apr_socket_timeout_set(mSocket, 0)); ll_apr_warn_status(apr_socket_opt_set(mSocket, APR_SO_SNDBUF, LL_SEND_BUFFER_SIZE)); ll_apr_warn_status(apr_socket_opt_set(mSocket, APR_SO_RCVBUF, LL_RECV_BUFFER_SIZE)); } LLIODataSocket::~LLIODataSocket() { } #endif