diff options
author | Adam <Adam@anope.org> | 2011-08-21 13:38:42 -0400 |
---|---|---|
committer | Adam <Adam@anope.org> | 2011-09-10 01:55:09 -0400 |
commit | 2eb708e5ad8b259876d24d828f7472b77864c256 (patch) | |
tree | bed6b70d4bc67eb413453a116e77f8f724cdf3fd | |
parent | 4fcb371bc8813cd647b7769a64d586e3a57d684d (diff) |
Cleaned up some of the socket code, cleaned up the pipe engines, added support for binary sockets, and cleaned up the asynch connect/accept code
-rw-r--r-- | include/extern.h | 11 | ||||
-rw-r--r-- | include/sockets.h | 169 | ||||
-rw-r--r-- | modules/extra/m_dnsbl.cpp | 13 | ||||
-rw-r--r-- | modules/extra/m_ssl.cpp | 202 | ||||
-rw-r--r-- | modules/extra/m_xmlrpc.cpp | 6 | ||||
-rw-r--r-- | modules/extra/xmlrpc.h | 4 | ||||
-rw-r--r-- | src/main.cpp | 103 | ||||
-rw-r--r-- | src/socket_clients.cpp | 96 | ||||
-rw-r--r-- | src/socket_transport.cpp | 185 | ||||
-rw-r--r-- | src/socketengines/pipeengine_eventfd.cpp | 49 | ||||
-rw-r--r-- | src/socketengines/pipeengine_pipe.cpp | 49 | ||||
-rw-r--r-- | src/socketengines/pipeengine_win32.cpp | 83 | ||||
-rw-r--r-- | src/socketengines/socketengine_epoll.cpp | 47 | ||||
-rw-r--r-- | src/socketengines/socketengine_poll.cpp | 34 | ||||
-rw-r--r-- | src/socketengines/socketengine_select.cpp | 16 | ||||
-rw-r--r-- | src/sockets.cpp | 376 |
16 files changed, 728 insertions, 715 deletions
diff --git a/include/extern.h b/include/extern.h index 0b90c6e14..1772c965c 100644 --- a/include/extern.h +++ b/include/extern.h @@ -106,7 +106,16 @@ E bool restarting; E Anope::string quitmsg; E time_t start_time; -E ConnectionSocket *UplinkSock; +class UplinkSocket : public ConnectionSocket, public BufferedSocket +{ + public: + UplinkSocket(); + ~UplinkSocket(); + bool Read(const Anope::string &); + void OnConnect(); + void OnError(const Anope::string &); +}; +E UplinkSocket *UplinkSock; E int CurrentUplink; E void save_databases(); diff --git a/include/sockets.h b/include/sockets.h index eb86130b5..b8d6033ee 100644 --- a/include/sockets.h +++ b/include/sockets.h @@ -106,22 +106,17 @@ class SocketException : public CoreException virtual ~SocketException() throw() { } }; -enum SocketType -{ - SOCKTYPE_BASE, - SOCKTYPE_BUFFERED, - SOCKTYPE_CONNECTION, - SOCKTYPE_CLIENT, - SOCKTYPE_LISTEN -}; - enum SocketFlag { SF_DEAD, - SF_WRITABLE + SF_WRITABLE, + SF_CONNECTING, + SF_CONNECTED, + SF_ACCEPTING, + SF_ACCEPTED }; -static const Anope::string SocketFlagStrings[] = { "SF_DEAD", "SF_WRITABLE", "" }; +static const Anope::string SocketFlagStrings[] = { "SF_DEAD", "SF_WRITABLE", "SF_CONNECTING", "SF_CONNECTED", "SF_ACCEPTING", "SF_ACCEPTED", "" }; class Socket; class ClientSocket; @@ -140,11 +135,12 @@ class CoreExport SocketIO virtual int Recv(Socket *s, char *buf, size_t sz); /** Write something to the socket - * @param s The socket - * @param buf What to write - * @return Number of bytes written + * @param s The socket + * @param buf The data to write + * @param size The length of the data */ - virtual int Send(Socket *s, const Anope::string &buf); + virtual int Send(Socket *s, const char *buf, size_t sz); + int Send(Socket *s, const Anope::string &buf); /** Accept a connection from a socket * @param s The socket @@ -152,11 +148,11 @@ class CoreExport SocketIO */ virtual ClientSocket *Accept(ListenSocket *s); - /** Check if a connection has been accepted - * @param s The client socket - * @return -1 on error, 0 to wait, 1 on success + /** Finished accepting a connection from a socket + * @param s The socket + * @return SF_ACCEPTED if accepted, SF_ACCEPTING if still in process, SF_DEAD on error */ - virtual int Accepted(ClientSocket *cs); + virtual SocketFlag FinishAccept(ClientSocket *cs); /** Bind a socket * @param s The socket @@ -172,18 +168,18 @@ class CoreExport SocketIO */ virtual void Connect(ConnectionSocket *s, const Anope::string &target, int port); - /** Check if this socket is connected + /** Called to potentially finish a pending connection * @param s The socket - * @return -1 for error, 0 for wait, 1 for connected + * @return SF_CONNECTED on success, SF_CONNECTING if still pending, and SF_DEAD on error. */ - virtual int Connected(ConnectionSocket *s); + virtual SocketFlag FinishConnect(ConnectionSocket *s); /** Called when the socket is destructing */ virtual void Destroy() { } }; -class CoreExport Socket : public Flags<SocketFlag, 2> +class CoreExport Socket : public Flags<SocketFlag> { protected: /* Socket FD */ @@ -198,19 +194,16 @@ class CoreExport Socket : public Flags<SocketFlag, 2> /* I/O functions used for this socket */ SocketIO *IO; - /* Type this socket is */ - SocketType Type; - - /** Empty constructor, used for things such as the pipe socket + /** Empty constructor, should not be called. */ Socket(); /** Default constructor - * @param sock The socket to use, 0 if we need to create our own + * @param sock The socket to use, -1 if we need to create our own * @param ipv6 true if using ipv6 * @param type The socket type, defaults to SOCK_STREAM */ - Socket(int sock, bool ipv6, int type = SOCK_STREAM); + Socket(int sock, bool ipv6 = false, int type = SOCK_STREAM); /** Default destructor */ @@ -242,6 +235,11 @@ class CoreExport Socket : public Flags<SocketFlag, 2> */ void Bind(const Anope::string &ip, int port = 0); + /** Called when there either is a read or write event. + * @return true to continue to call ProcessRead/ProcessWrite, false to not continue + */ + virtual bool Process(); + /** Called when there is something to be received for this socket * @return true on success, false to drop this socket */ @@ -258,7 +256,7 @@ class CoreExport Socket : public Flags<SocketFlag, 2> virtual void ProcessError(); }; -class CoreExport BufferedSocket : public Socket +class CoreExport BufferedSocket : public virtual Socket { protected: /* Things to be written to the socket */ @@ -269,16 +267,9 @@ class CoreExport BufferedSocket : public Socket int RecvLen; public: - /** Blank constructor - */ - BufferedSocket(); - /** Constructor - * @param fd FD to use - * @param ipv6 true for ipv6 - * @param type socket type, defaults to SOCK_STREAM */ - BufferedSocket(int fd, bool ipv6, int type = SOCK_STREAM); + BufferedSocket(); /** Default destructor */ @@ -317,6 +308,52 @@ class CoreExport BufferedSocket : public Socket int WriteBufferLen() const; }; +class CoreExport BinarySocket : public virtual Socket +{ + struct DataBlock + { + char *buf; + size_t len; + + DataBlock(const char *b, size_t l); + ~DataBlock(); + }; + + std::deque<DataBlock *> WriteBuffer; + + public: + /** Constructor + */ + BinarySocket(); + + /** Default destructor + */ + virtual ~BinarySocket(); + + /** Called when there is something to be received for this socket + * @return true on success, false to drop this socket + */ + bool ProcessRead(); + + /** Called when the socket is ready to be written to + * @return true on success, false to drop this socket + */ + bool ProcessWrite(); + + /** Write data to the socket + * @param buffer The data to write + * @param l The length of the data + */ + void Write(const char *buffer, size_t l); + + /** Called with data from the socket + * @param buffer The data + * @param l The length of buffer + * @return true to continue reading, false to drop the socket + */ + virtual bool Read(const char *buffer, size_t l); +}; + class CoreExport ListenSocket : public Socket { public: @@ -341,22 +378,18 @@ class CoreExport ListenSocket : public Socket * @param addr The sockaddr for where the connection came from * @return The new socket */ - virtual ClientSocket *OnAccept(int fd, const sockaddrs &addr); + virtual ClientSocket *OnAccept(int fd, const sockaddrs &addr) = 0; }; -class CoreExport ConnectionSocket : public BufferedSocket +class CoreExport ConnectionSocket : public virtual Socket { public: /* Sockaddrs for connection ip/port */ sockaddrs conaddr; - /* True if connected */ - bool connected; /** Constructor - * @param ipv6 true to use IPv6 - * @param type The socket type, defaults to SOCK_STREAM */ - ConnectionSocket(bool ipv6 = false, int type = SOCK_STREAM); + ConnectionSocket(); /** Connect the socket * @param TargetHost The target host to connect to @@ -364,15 +397,11 @@ class CoreExport ConnectionSocket : public BufferedSocket */ void Connect(const Anope::string &TargetHost, int Port); - /** Called when there is something to be received for this socket - * @return true on success, false to drop this socket - */ - bool ProcessRead(); - - /** Called when the socket is ready to be written to - * @return true on success, false to drop this socket + /** Called when there either is a read or write event. + * Used to determine whether or not this socket is connected yet. + * @return true to continue to call ProcessRead/ProcessWrite, false to not continue */ - bool ProcessWrite(); + bool Process(); /** Called when there is an error for this socket * @return true on success, false to drop this socket @@ -389,7 +418,7 @@ class CoreExport ConnectionSocket : public BufferedSocket virtual void OnError(const Anope::string &error); }; -class CoreExport ClientSocket : public BufferedSocket +class CoreExport ClientSocket : public virtual Socket { public: /* Listen socket this connection came from */ @@ -399,23 +428,31 @@ class CoreExport ClientSocket : public BufferedSocket /** Constructor * @param ls Listen socket this connection is from - * @param fd New FD for this socket * @param addr Address the connection came from */ - ClientSocket(ListenSocket *ls, int fd, const sockaddrs &addr); + ClientSocket(ListenSocket *ls, const sockaddrs &addr); - /** Called when there is something to be received for this socket - * @return true on success, false to drop this socket + /** Called when there either is a read or write event. + * Used to determine whether or not this socket is connected yet. + * @return true to continue to call ProcessRead/ProcessWrite, false to not continue */ - bool ProcessRead(); + bool Process(); - /** Called when the socket is ready to be written to + /** Called when there is an error for this socket * @return true on success, false to drop this socket */ - bool ProcessWrite(); + void ProcessError(); + + /** Called when a client has been accepted() successfully. + */ + virtual void OnAccept(); + + /** Called when there was an error accepting the client + */ + virtual void OnError(const Anope::string &error); }; -class CoreExport Pipe : public BufferedSocket +class CoreExport Pipe : public Socket { public: /** The FD of the write pipe (if this isn't evenfd) @@ -427,13 +464,13 @@ class CoreExport Pipe : public BufferedSocket */ Pipe(); - /** Called when data is to be read + /** Destructor */ - bool ProcessRead(); + ~Pipe(); - /** Function that calls OnNotify + /** Called when data is to be read */ - bool Read(const Anope::string &); + bool ProcessRead(); /** Called when this pipe needs to be woken up */ diff --git a/modules/extra/m_dnsbl.cpp b/modules/extra/m_dnsbl.cpp index f62908b39..7e5342d3c 100644 --- a/modules/extra/m_dnsbl.cpp +++ b/modules/extra/m_dnsbl.cpp @@ -56,20 +56,19 @@ class DNSBLResolver : public DNSRequest reason = reason.replace_all_cs("%r", record_reason); reason = reason.replace_all_cs("%N", Config->NetworkName); - XLine *x = NULL; BotInfo *operserv = findbot(Config->OperServ); - if (this->add_to_akill && akills && (x = akills->Add(Anope::string("*@") + user->host, Config->OperServ, Anope::CurTime + this->blacklist.bantime, reason))) + Log(operserv) << "DNSBL: " << user->GetMask() << " appears in " << this->blacklist.name; + if (this->add_to_akill && akills) { - Log(operserv) << "DNSBL: " << user->GetMask() << " appears in " << this->blacklist.name; + XLine *x = akills->Add("*@" + user->host, Config->OperServ, Anope::CurTime + this->blacklist.bantime, reason); /* If AkillOnAdd is disabled send it anyway, noone wants bots around... */ if (!Config->AkillOnAdd) - ircdproto->SendAkill(user, x); + akills->Send(NULL, x); } else { - Log(operserv) << "DNSBL: " << user->GetMask() << " appears in " << this->blacklist.name << "(" << reason << ")"; - XLine xline(Anope::string("*@") + user->host, Config->OperServ, Anope::CurTime + this->blacklist.bantime, reason); - ircdproto->SendAkill(user, &xline); + XLine xline("*@" + user->host, Config->OperServ, Anope::CurTime + this->blacklist.bantime, reason); + ircdproto->SendAkill(NULL, &xline); } } }; diff --git a/modules/extra/m_ssl.cpp b/modules/extra/m_ssl.cpp index 41bc49993..5143d964a 100644 --- a/modules/extra/m_ssl.cpp +++ b/modules/extra/m_ssl.cpp @@ -31,8 +31,6 @@ class SSLSocketIO : public SocketIO public: /* The SSL socket for this socket */ SSL *sslsock; - /* -1 if not, 0 if waiting, 1 if true */ - int connected, accepted; /** Constructor */ @@ -46,12 +44,12 @@ class SSLSocketIO : public SocketIO */ int Recv(Socket *s, char *buf, size_t sz); - /** Really write something to the socket - * @param s The socket - * @param buf What to write - * @return Number of bytes written + /** Write something to the socket + * @param s The socket + * @param buf The data to write + * @param size The length of the data */ - int Send(Socket *s, const Anope::string &buf); + int Send(Socket *s, const char *buf, size_t sz); /** Accept a connection from a socket * @param s The socket @@ -59,11 +57,11 @@ class SSLSocketIO : public SocketIO */ ClientSocket *Accept(ListenSocket *s); - /** Check if a connection has been accepted - * @param s The client socket - * @return -1 on error, 0 to wait, 1 on success + /** Finished accepting a connection from a socket + * @param s The socket + * @return SF_ACCEPTED if accepted, SF_ACCEPTING if still in process, SF_DEAD on error */ - int Accepted(ClientSocket *cs); + SocketFlag FinishAccept(ClientSocket *cs); /** Connect the socket * @param s THe socket @@ -72,11 +70,11 @@ class SSLSocketIO : public SocketIO */ void Connect(ConnectionSocket *s, const Anope::string &target, int port); - /** Check if this socket is connected + /** Called to potentially finish a pending connection * @param s The socket - * @return -1 for error, 0 for wait, 1 for connected + * @return SF_CONNECTED on success, SF_CONNECTING if still pending, and SF_DEAD on error. */ - int Connected(ConnectionSocket *s); + SocketFlag FinishConnect(ConnectionSocket *s); /** Called when the socket is destructing */ @@ -194,7 +192,7 @@ void MySSLService::Init(Socket *s) s->IO = new SSLSocketIO(); } -SSLSocketIO::SSLSocketIO() : connected(-1), accepted(-1) +SSLSocketIO::SSLSocketIO() { this->sslsock = NULL; } @@ -206,9 +204,9 @@ int SSLSocketIO::Recv(Socket *s, char *buf, size_t sz) return i; } -int SSLSocketIO::Send(Socket *s, const Anope::string &buf) +int SSLSocketIO::Send(Socket *s, const char *buf, size_t sz) { - int i = SSL_write(this->sslsock, buf.c_str(), buf.length()); + int i = SSL_write(this->sslsock, buf, sz); TotalWritten += i; return i; } @@ -217,8 +215,20 @@ ClientSocket *SSLSocketIO::Accept(ListenSocket *s) { if (s->IO == &normalSocketIO) throw SocketException("Attempting to accept on uninitialized socket with SSL"); - - ClientSocket *newsocket = normalSocketIO.Accept(s); + + sockaddrs conaddr; + + socklen_t size = sizeof(conaddr); + int newsock = accept(s->GetFD(), &conaddr.sa, &size); + +#ifndef INVALID_SOCKET + const int INVALID_SOCKET = -1; +#endif + + if (newsock < 0 || newsock == INVALID_SOCKET) + throw SocketException("Unable to accept connection: " + Anope::LastError()); + + ClientSocket *newsocket = s->OnAccept(newsock, conaddr); me->service.Init(newsocket); SSLSocketIO *IO = debug_cast<SSLSocketIO *>(newsocket->IO); @@ -231,47 +241,47 @@ ClientSocket *SSLSocketIO::Accept(ListenSocket *s) if (!SSL_set_fd(IO->sslsock, newsocket->GetFD())) throw SocketException("Unable to set SSL fd"); - int ret = SSL_accept(IO->sslsock); - if (ret <= 0) - { - IO->accepted = 0; - int error = SSL_get_error(IO->sslsock, ret); - if (ret == -1 && (error == SSL_ERROR_WANT_READ || error == SSL_ERROR_WANT_WRITE)) - { - SocketEngine::MarkWritable(newsocket); - return newsocket; - } - - throw SocketException("Unable to accept new SSL connection: " + Anope::string(ERR_error_string(ERR_get_error(), NULL))); - } + newsocket->SetFlag(SF_ACCEPTING); + this->FinishAccept(newsocket); - IO->accepted = 1; return newsocket; } -int SSLSocketIO::Accepted(ClientSocket *cs) +SocketFlag SSLSocketIO::FinishAccept(ClientSocket *cs) { - SSLSocketIO *IO = debug_cast<SSLSocketIO *>(cs->IO); + if (cs->IO == &normalSocketIO) + throw SocketException("Attempting to finish connect uninitialized socket with SSL"); + else if (cs->HasFlag(SF_ACCEPTED)) + return SF_ACCEPTED; + else if (!cs->HasFlag(SF_ACCEPTING)) + throw SocketException("SSLSocketIO::FinishAccept called for a socket not accepted nor accepting?"); - if (IO->accepted == 0) + SSLSocketIO *IO = debug_cast<SSLSocketIO *>(cs->IO); + + int ret = SSL_accept(IO->sslsock); + if (ret <= 0) { - int ret = SSL_accept(IO->sslsock); - if (ret <= 0) + int error = SSL_get_error(IO->sslsock, ret); + if (ret == -1 && (error == SSL_ERROR_WANT_READ || error == SSL_ERROR_WANT_WRITE)) { - int error = SSL_get_error(IO->sslsock, ret); - if (ret == -1 && (error == SSL_ERROR_WANT_READ || error == SSL_ERROR_WANT_WRITE)) - { - SocketEngine::MarkWritable(cs); - return 0; - } - - return -1; + SocketEngine::MarkWritable(cs); + return SF_ACCEPTING; + } + else + { + cs->OnError(ERR_error_string(ERR_get_error(), NULL)); + cs->SetFlag(SF_DEAD); + cs->UnsetFlag(SF_ACCEPTING); + return SF_DEAD; } - IO->accepted = 1; - return 0; } - - return IO->accepted; + else + { + cs->SetFlag(SF_ACCEPTED); + cs->UnsetFlag(SF_ACCEPTING); + cs->OnAccept(); + return SF_ACCEPTED; + } } void SSLSocketIO::Connect(ConnectionSocket *s, const Anope::string &target, int port) @@ -279,62 +289,78 @@ void SSLSocketIO::Connect(ConnectionSocket *s, const Anope::string &target, int if (s->IO == &normalSocketIO) throw SocketException("Attempting to connect uninitialized socket with SSL"); - normalSocketIO.Connect(s, target, port); - - SSLSocketIO *IO = debug_cast<SSLSocketIO *>(s->IO); + s->UnsetFlag(SF_CONNECTING); + s->UnsetFlag(SF_CONNECTED); - IO->sslsock = SSL_new(client_ctx); - if (!IO->sslsock) - throw SocketException("Unable to initialize SSL socket"); - - if (!SSL_set_fd(IO->sslsock, s->GetFD())) - throw SocketException("Unable to set SSL fd"); - - int ret = SSL_connect(IO->sslsock); - if (ret <= 0) + s->conaddr.pton(s->IsIPv6() ? AF_INET6 : AF_INET, target, port); + int c = connect(s->GetFD(), &s->conaddr.sa, s->conaddr.size()); + if (c == -1) { - IO->connected = 0; - int error = SSL_get_error(IO->sslsock, ret); - if (ret == -1 && (error == SSL_ERROR_WANT_READ || error == SSL_ERROR_WANT_WRITE)) + if (Anope::LastErrorCode() != EINPROGRESS) + { + s->OnError(Anope::LastError()); + s->SetFlag(SF_DEAD); + return; + } + else { SocketEngine::MarkWritable(s); + s->SetFlag(SF_CONNECTING); return; } - - s->ProcessError(); } - - IO->connected = 1; + else + { + s->SetFlag(SF_CONNECTING); + this->FinishConnect(s); + } } -int SSLSocketIO::Connected(ConnectionSocket *s) +SocketFlag SSLSocketIO::FinishConnect(ConnectionSocket *s) { if (s->IO == &normalSocketIO) - throw SocketException("Connected() called for non ssl socket?"); - - int i = SocketIO::Connected(s); - if (i != 1) - return i; + throw SocketException("Attempting to finish connect uninitialized socket with SSL"); + else if (s->HasFlag(SF_CONNECTED)) + return SF_CONNECTED; + else if (!s->HasFlag(SF_CONNECTING)) + throw SocketException("SSLSocketIO::FinishConnect called for a socket not connected nor connecting?"); SSLSocketIO *IO = debug_cast<SSLSocketIO *>(s->IO); - if (IO->connected == 0) + if (IO->sslsock == NULL) { - int ret = SSL_connect(IO->sslsock); - if (ret <= 0) - { - int error = SSL_get_error(IO->sslsock, ret); - if (ret == -1 && (error == SSL_ERROR_WANT_READ || error == SSL_ERROR_WANT_WRITE)) - return 0; + IO->sslsock = SSL_new(client_ctx); + if (!IO->sslsock) + throw SocketException("Unable to initialize SSL socket"); - s->ProcessError(); - return -1; - } - IO->connected = 1; - return 0; // poll for next read/write (which will be real), don't assume ones available + if (!SSL_set_fd(IO->sslsock, s->GetFD())) + throw SocketException("Unable to set SSL fd"); } - return IO->connected; + int ret = SSL_connect(IO->sslsock); + if (ret <= 0) + { + int error = SSL_get_error(IO->sslsock, ret); + if (ret == -1 && (error == SSL_ERROR_WANT_READ || error == SSL_ERROR_WANT_WRITE)) + { + SocketEngine::MarkWritable(s); + return SF_CONNECTING; + } + else + { + s->OnError(ERR_error_string(ERR_get_error(), NULL)); + s->UnsetFlag(SF_CONNECTING); + s->SetFlag(SF_DEAD); + return SF_DEAD; + } + } + else + { + s->UnsetFlag(SF_CONNECTING); + s->SetFlag(SF_CONNECTED); + s->OnConnect(); + return SF_CONNECTED; + } } void SSLSocketIO::Destroy() diff --git a/modules/extra/m_xmlrpc.cpp b/modules/extra/m_xmlrpc.cpp index 9d301d684..38a733c7c 100644 --- a/modules/extra/m_xmlrpc.cpp +++ b/modules/extra/m_xmlrpc.cpp @@ -9,7 +9,7 @@ class MyXMLRPCClientSocket : public XMLRPCClientSocket /* Used to skip the (optional) HTTP header, which we really don't care about */ bool in_query; public: - MyXMLRPCClientSocket(XMLRPCListenSocket *ls, int fd, const sockaddrs &addr) : XMLRPCClientSocket(ls, fd, addr), in_query(false) + MyXMLRPCClientSocket(XMLRPCListenSocket *ls, int fd, const sockaddrs &addr) : Socket(fd, ls->IsIPv6()), XMLRPCClientSocket(ls, addr), in_query(false) { } @@ -245,9 +245,9 @@ class ModuleXMLRPC : public Module Socket *s = it->second; ++it; - if (s->Type == SOCKTYPE_CLIENT) + ClientSocket *cs = dynamic_cast<ClientSocket *>(s); + if (cs != NULL) { - ClientSocket *cs = debug_cast<ClientSocket *>(s); for (unsigned i = 0; i < listen_sockets.size(); ++i) if (cs->LS == listen_sockets[i]) { diff --git a/modules/extra/xmlrpc.h b/modules/extra/xmlrpc.h index 38f548c4f..364617ecc 100644 --- a/modules/extra/xmlrpc.h +++ b/modules/extra/xmlrpc.h @@ -3,13 +3,13 @@ class XMLRPCClientSocket; class XMLRPCListenSocket; class XMLRPCServiceInterface; -class XMLRPCClientSocket : public ClientSocket +class XMLRPCClientSocket : public ClientSocket, public BufferedSocket { protected: Anope::string query; public: - XMLRPCClientSocket(ListenSocket *ls, int fd, const sockaddrs &addr) : ClientSocket(ls, fd, addr) { } + XMLRPCClientSocket(ListenSocket *ls, const sockaddrs &addr) : ClientSocket(ls, addr), BufferedSocket() { } virtual ~XMLRPCClientSocket() { } diff --git a/src/main.cpp b/src/main.cpp index 3730c60ea..b00a6ee6e 100644 --- a/src/main.cpp +++ b/src/main.cpp @@ -81,7 +81,7 @@ class UpdateTimer : public Timer } }; -ConnectionSocket *UplinkSock = NULL; +UplinkSocket *UplinkSock = NULL; int CurrentUplink = 0; static void Connect(); @@ -105,77 +105,72 @@ class ReconnectTimer : public Timer } }; -class UplinkSocket : public ConnectionSocket +UplinkSocket::UplinkSocket() : Socket(-1, Config->Uplinks[CurrentUplink]->ipv6), ConnectionSocket(), BufferedSocket() { - public: - UplinkSocket() : ConnectionSocket(Config->Uplinks[CurrentUplink]->ipv6) - { - UplinkSock = this; - } + UplinkSock = this; +} - ~UplinkSocket() +UplinkSocket::~UplinkSocket() +{ + if (ircdproto && Me && !Me->GetLinks().empty() && Me->GetLinks()[0]->IsSynced()) { - if (ircdproto && Me && !Me->GetLinks().empty() && Me->GetLinks()[0]->IsSynced()) + FOREACH_MOD(I_OnServerDisconnect, OnServerDisconnect()); + + for (Anope::insensitive_map<User *>::const_iterator it = UserListByNick.begin(); it != UserListByNick.end(); ++it) { - FOREACH_MOD(I_OnServerDisconnect, OnServerDisconnect()); + User *u = it->second; - for (Anope::insensitive_map<User *>::const_iterator it = UserListByNick.begin(); it != UserListByNick.end(); ++it) + if (u->server == Me) { - User *u = it->second; - - if (u->server == Me) - { - /* Don't use quitmsg here, it may contain information you don't want people to see */ - ircdproto->SendQuit(u, "Shutting down"); - BotInfo *bi = findbot(u->nick); - if (bi != NULL) - bi->introduced = false; - } + /* Don't use quitmsg here, it may contain information you don't want people to see */ + ircdproto->SendQuit(u, "Shutting down"); + BotInfo *bi = findbot(u->nick); + if (bi != NULL) + bi->introduced = false; } - - ircdproto->SendSquit(Config->ServerName, quitmsg); - - this->ProcessWrite(); // Write out the last bit } - for (unsigned i = Me->GetLinks().size(); i > 0; --i) - if (!Me->GetLinks()[i - 1]->HasFlag(SERVER_JUPED)) - Me->GetLinks()[i - 1]->Delete(Me->GetName() + " " + Me->GetLinks()[i - 1]->GetName()); + ircdproto->SendSquit(Config->ServerName, quitmsg); - UplinkSock = NULL; + this->ProcessWrite(); // Write out the last bit + } - Me->SetFlag(SERVER_SYNCING); + for (unsigned i = Me->GetLinks().size(); i > 0; --i) + if (!Me->GetLinks()[i - 1]->HasFlag(SERVER_JUPED)) + Me->GetLinks()[i - 1]->Delete(Me->GetName() + " " + Me->GetLinks()[i - 1]->GetName()); - if (!quitting) - { - int Retry = Config->RetryWait; - if (Retry <= 0) - Retry = 60; + UplinkSock = NULL; - Log() << "Retrying in " << Retry << " seconds"; - new ReconnectTimer(Retry); - } - } + Me->SetFlag(SERVER_SYNCING); - bool Read(const Anope::string &buf) + if (!quitting) { - process(buf); - return true; - } + int Retry = Config->RetryWait; + if (Retry <= 0) + Retry = 60; - void OnConnect() - { - Log() << "Successfully connected to " << Config->Uplinks[CurrentUplink]->host << ":" << Config->Uplinks[CurrentUplink]->port; - ircdproto->SendConnect(); - FOREACH_MOD(I_OnServerConnect, OnServerConnect()); + Log() << "Retrying in " << Retry << " seconds"; + new ReconnectTimer(Retry); } +} - void OnError(const Anope::string &error) - { - Log() << "Unable to connect to server " << Config->Uplinks[CurrentUplink]->host << ":" << Config->Uplinks[CurrentUplink]->port << (!error.empty() ? (": " + error) : ""); - this->SetFlag(SF_DEAD); - } -}; +bool UplinkSocket::Read(const Anope::string &buf) +{ + process(buf); + return true; +} + +void UplinkSocket::OnConnect() +{ + Log() << "Successfully connected to " << Config->Uplinks[CurrentUplink]->host << ":" << Config->Uplinks[CurrentUplink]->port; + ircdproto->SendConnect(); + FOREACH_MOD(I_OnServerConnect, OnServerConnect()); +} + +void UplinkSocket::OnError(const Anope::string &error) +{ + Log() << "Unable to connect to server " << Config->Uplinks[CurrentUplink]->host << ":" << Config->Uplinks[CurrentUplink]->port << (!error.empty() ? (": " + error) : ""); +} static void Connect() { diff --git a/src/socket_clients.cpp b/src/socket_clients.cpp new file mode 100644 index 000000000..4a43629a4 --- /dev/null +++ b/src/socket_clients.cpp @@ -0,0 +1,96 @@ +/* + * + * (C) 2003-2011 Anope Team + * Contact us at team@anope.org + * + * Please read COPYING and README for further details. + * + * Based on the original code of Epona by Lara. + * Based on the original code of Services by Andy Church. + */ + +#include "services.h" + +ConnectionSocket::ConnectionSocket() : Socket() +{ +} + +void ConnectionSocket::Connect(const Anope::string &TargetHost, int Port) +{ + this->IO->Connect(this, TargetHost, Port); +} + +bool ConnectionSocket::Process() +{ + try + { + if (this->HasFlag(SF_CONNECTED)) + return true; + else if (this->HasFlag(SF_CONNECTING)) + this->SetFlag(this->IO->FinishConnect(this)); + else + this->SetFlag(SF_DEAD); + } + catch (const SocketException &ex) + { + Log() << ex.GetReason(); + } + return false; +} + +void ConnectionSocket::ProcessError() +{ + int optval = 0; + socklen_t optlen = sizeof(optval); + getsockopt(this->GetFD(), SOL_SOCKET, SO_ERROR, reinterpret_cast<char *>(&optval), &optlen); + errno = optval; + this->OnError(optval ? Anope::LastError() : ""); +} + +void ConnectionSocket::OnConnect() +{ +} + +void ConnectionSocket::OnError(const Anope::string &) +{ +} + +ClientSocket::ClientSocket(ListenSocket *ls, const sockaddrs &addr) : Socket(), LS(ls), clientaddr(addr) +{ +} + +bool ClientSocket::Process() +{ + try + { + if (this->HasFlag(SF_ACCEPTED)) + return true; + else if (this->HasFlag(SF_ACCEPTING)) + this->SetFlag(this->IO->FinishAccept(this)); + else + this->SetFlag(SF_DEAD); + } + catch (const SocketException &ex) + { + Log() << ex.GetReason(); + } + return false; +} + +void ClientSocket::ProcessError() +{ + int optval = 0; + socklen_t optlen = sizeof(optval); + getsockopt(this->GetFD(), SOL_SOCKET, SO_ERROR, reinterpret_cast<char *>(&optval), &optlen); + errno = optval; + this->OnError(optval ? Anope::LastError() : ""); +} + +void ClientSocket::OnAccept() +{ +} + +void ClientSocket::OnError(const Anope::string &error) +{ +} + diff --git a/src/socket_transport.cpp b/src/socket_transport.cpp new file mode 100644 index 000000000..a7b92ff20 --- /dev/null +++ b/src/socket_transport.cpp @@ -0,0 +1,185 @@ +/* + * + * (C) 2003-2011 Anope Team + * Contact us at team@anope.org + * + * Please read COPYING and README for further details. + * + * Based on the original code of Epona by Lara. + * Based on the original code of Services by Andy Church. + */ + +#include "services.h" + +BufferedSocket::BufferedSocket() +{ +} + +BufferedSocket::~BufferedSocket() +{ +} + +bool BufferedSocket::ProcessRead() +{ + char tbuffer[NET_BUFSIZE]; + + this->RecvLen = 0; + + int len = this->IO->Recv(this, tbuffer, sizeof(tbuffer) - 1); + if (len <= 0) + return false; + + tbuffer[len] = 0; + this->RecvLen = len; + + Anope::string sbuffer = this->extrabuf; + sbuffer += tbuffer; + this->extrabuf.clear(); + size_t lastnewline = sbuffer.rfind('\n'); + if (lastnewline == Anope::string::npos) + { + this->extrabuf = sbuffer; + return true; + } + if (lastnewline < sbuffer.length() - 1) + { + this->extrabuf = sbuffer.substr(lastnewline); + this->extrabuf.trim(); + sbuffer = sbuffer.substr(0, lastnewline); + } + + sepstream stream(sbuffer, '\n'); + + Anope::string tbuf; + while (stream.GetToken(tbuf)) + { + tbuf.trim(); + if (!tbuf.empty() && !Read(tbuf)) + return false; + } + + return true; +} + +bool BufferedSocket::ProcessWrite() +{ + int count = this->IO->Send(this, this->WriteBuffer); + if (count <= -1) + return false; + this->WriteBuffer = this->WriteBuffer.substr(count); + if (this->WriteBuffer.empty()) + SocketEngine::ClearWritable(this); + + return true; +} + +bool BufferedSocket::Read(const Anope::string &buf) +{ + return false; +} + +void BufferedSocket::Write(const char *message, ...) +{ + va_list vi; + char tbuffer[BUFSIZE]; + + if (!message) + return; + + va_start(vi, message); + vsnprintf(tbuffer, sizeof(tbuffer), message, vi); + va_end(vi); + + Anope::string sbuf = tbuffer; + Write(sbuf); +} + +void BufferedSocket::Write(const Anope::string &message) +{ + this->WriteBuffer += message + "\r\n"; + SocketEngine::MarkWritable(this); +} + +int BufferedSocket::ReadBufferLen() const +{ + return RecvLen; +} + +int BufferedSocket::WriteBufferLen() const +{ + return this->WriteBuffer.length(); +} + + +BinarySocket::DataBlock::DataBlock(const char *b, size_t l) +{ + this->buf = new char[l]; + memcpy(this->buf, b, l); + this->len = l; +} + +BinarySocket::DataBlock::~DataBlock() +{ + delete [] this->buf; +} + +BinarySocket::BinarySocket() +{ +} + +BinarySocket::~BinarySocket() +{ +} + +bool BinarySocket::ProcessRead() +{ + char tbuffer[NET_BUFSIZE]; + + int len = this->IO->Recv(this, tbuffer, sizeof(tbuffer)); + if (len <= 0) + return false; + + return this->Read(tbuffer, len); +} + +bool BinarySocket::ProcessWrite() +{ + if (this->WriteBuffer.empty()) + { + SocketEngine::ClearWritable(this); + return true; + } + + DataBlock *d = this->WriteBuffer.front(); + + int len = this->IO->Send(this, d->buf, d->len); + if (len <= -1) + return false; + else if (static_cast<size_t>(len) == d->len) + { + delete d; + this->WriteBuffer.pop_front(); + } + else + { + d->buf += len; + d->len -= len; + } + + if (this->WriteBuffer.empty()) + SocketEngine::ClearWritable(this); + + return true; +} + +void BinarySocket::Write(const char *buffer, size_t l) +{ + this->WriteBuffer.push_back(new DataBlock(buffer, l)); + SocketEngine::MarkWritable(this); +} + +bool BinarySocket::Read(const char *buffer, size_t l) +{ + return true; +} + diff --git a/src/socketengines/pipeengine_eventfd.cpp b/src/socketengines/pipeengine_eventfd.cpp index aa712de3d..21931bb1b 100644 --- a/src/socketengines/pipeengine_eventfd.cpp +++ b/src/socketengines/pipeengine_eventfd.cpp @@ -1,64 +1,29 @@ #include "services.h" #include <sys/eventfd.h> -class PipeIO : public SocketIO +Pipe::Pipe() : Socket(eventfd(0, EFD_NONBLOCK)) { - public: - /** Receive something from the buffer - * @param s The socket - * @param buf The buf to read to - * @param sz How much to read - * @return Number of bytes received - */ - int Recv(Socket *s, char *buf, size_t sz) - { - static eventfd_t dummy; - return !eventfd_read(s->GetFD(), &dummy); - } - - /** Write something to the socket - * @param s The socket - * @param buf What to write - * @return Number of bytes written - */ - int Send(Socket *s, const Anope::string &buf) - { - return !eventfd_write(s->GetFD(), 1); - } -} pipeSocketIO; - -Pipe::Pipe() : BufferedSocket() -{ - this->IO = &pipeSocketIO; - this->Sock = eventfd(0, EFD_NONBLOCK); if (this->Sock < 0) - throw CoreException(Anope::string("Could not create pipe: ") + Anope::LastError()); - - this->IPv6 = false; + throw CoreException("Could not create pipe: " + Anope::LastError()); SocketEngine::AddSocket(this); } -bool Pipe::ProcessRead() +Pipe::~Pipe() { - this->IO->Recv(this, NULL, 0); - return this->Read(""); } -bool Pipe::Read(const Anope::string &) +bool Pipe::ProcessRead() { + eventfd_t dummy; + eventfd_read(this->GetFD(), &dummy); this->OnNotify(); return true; } void Pipe::Notify() { - /* Note we send this immediatly. If use use Socket::Write and if this functions is called - * from a thread, only epoll is able to pick up the change to this sockets want flags immediately - * Other engines time out then pick up and write the change then read it back, which - * is too slow for most things. - */ - this->IO->Send(this, ""); + eventfd_write(this->GetFD(), 1); } void Pipe::OnNotify() diff --git a/src/socketengines/pipeengine_pipe.cpp b/src/socketengines/pipeengine_pipe.cpp index b3ca15d87..ceaded8ea 100644 --- a/src/socketengines/pipeengine_pipe.cpp +++ b/src/socketengines/pipeengine_pipe.cpp @@ -1,67 +1,40 @@ #include "services.h" -class PipeIO : public SocketIO -{ - public: - /** Receive something from the buffer - * @param s The socket - * @param buf The buf to read to - * @param sz How much to read - * @return Number of bytes received - */ - int Recv(Socket *s, char *buf, size_t sz) - { - static char dummy[512]; - while (read(s->GetFD(), &dummy, 512) == 512); - return 0; - } - - /** Write something to the socket - * @param s The socket - * @param buf What to write - * @return Number of bytes written - */ - int Send(Socket *s, const Anope::string &buf) - { - static const char dummy = '*'; - Pipe *pipe = debug_cast<Pipe *>(s); - return write(pipe->WritePipe, &dummy, 1); - } -} pipeSocketIO; - -Pipe::Pipe() : BufferedSocket() +Pipe::Pipe() : Socket(-1) { int fds[2]; if (pipe(fds)) - throw CoreException(Anope::string("Could not create pipe: ") + Anope::LastError()); + throw CoreException("Could not create pipe: " + Anope::LastError()); int flags = fcntl(fds[0], F_GETFL, 0); fcntl(fds[0], F_SETFL, flags | O_NONBLOCK); flags = fcntl(fds[1], F_GETFL, 0); fcntl(fds[1], F_SETFL, flags | O_NONBLOCK); - this->IO = &pipeSocketIO; + this->~Socket(); + this->Sock = fds[0]; this->WritePipe = fds[1]; - this->IPv6 = false; SocketEngine::AddSocket(this); } -bool Pipe::ProcessRead() +Pipe::~Pipe() { - this->IO->Recv(this, NULL, 0); - return this->Read(""); + CloseSocket(this->WritePipe); } -bool Pipe::Read(const Anope::string &) +bool Pipe::ProcessRead() { + char dummy[512]; + while (read(this->GetFD(), &dummy, 512) == 512); this->OnNotify(); return true; } void Pipe::Notify() { - this->IO->Send(this, ""); + const char dummy = '*'; + write(this->WritePipe, &dummy, 1); } void Pipe::OnNotify() diff --git a/src/socketengines/pipeengine_win32.cpp b/src/socketengines/pipeengine_win32.cpp index 3bd18d0cd..f34472870 100644 --- a/src/socketengines/pipeengine_win32.cpp +++ b/src/socketengines/pipeengine_win32.cpp @@ -1,89 +1,50 @@ #include "services.h" -static ClientSocket *newsocket = NULL; - -class LSocket : public ListenSocket -{ - public: - LSocket(const Anope::string &host, int port) : ListenSocket(host, port, false) { } - - ClientSocket *OnAccept(int fd, const sockaddrs &addr) - { - newsocket = new ClientSocket(this, fd, addr); - return newsocket; - } -}; - -class PipeIO : public SocketIO +Pipe::Pipe() : Socket(-1) { - public: - /** Receive something from the buffer - * @param s The socket - * @param buf The buf to read to - * @param sz How much to read - * @return Number of bytes received - */ - int Recv(Socket *s, char *buf, size_t sz) - { - static char dummy[512]; - return recv(s->GetFD(), dummy, 512, 0); - } - - /** Write something to the socket - * @param s The socket - * @param buf What to write - * @return Number of bytes written - */ - int Send(Socket *s, const Anope::string &buf) - { - static const char dummy = '*'; - Pipe *pipe = debug_cast<Pipe *>(s); - return send(pipe->WritePipe, &dummy, 1, 0); - } -} pipeSocketIO; + sockaddrs localhost; -Pipe::Pipe() : BufferedSocket() -{ - LSocket lfs("127.0.0.1", 0); + localhost.pton(AF_INET, "127.0.0.1"); - int cfd = socket(AF_INET, SOCK_STREAM, 0); + int cfd = socket(AF_INET, SOCK_STREAM, 0), lfd = socket(AF_INET, SOCK_STREAM, 0); if (cfd == -1) throw CoreException("Error accepting new socket for Pipe"); + + if (bind(lfd, &localhost.sa, localhost.size()) == -1) + throw CoreException("Error accepting new socket for Pipe"); + if (listen(lfd, 1) == -1) + throw CoreException("Error accepting new socket for Pipe"); - sockaddr_in addr; - socklen_t sz = sizeof(addr); - getsockname(lfs.GetFD(), reinterpret_cast<sockaddr *>(&addr), &sz); + sockaddrs lfd_addr; + socklen_t sz = sizeof(lfd_addr); + getsockname(lfd, &lfd_addr.sa, &sz); - if (connect(cfd, reinterpret_cast<sockaddr *>(&addr), sz)) - throw CoreException("Error accepting new socket for Pipe"); - lfs.ProcessRead(); - if (!newsocket) + if (connect(cfd, &lfd_addr.sa, lfd_addr.size())) throw CoreException("Error accepting new socket for Pipe"); + CloseSocket(lfd); - this->IO = &pipeSocketIO; - this->Sock = cfd; - this->WritePipe = newsocket->GetFD(); - this->IPv6 = false; + this->WritePipe = cfd; SocketEngine::AddSocket(this); - newsocket = NULL; } -bool Pipe::ProcessRead() +Pipe::~Pipe() { - this->IO->Recv(this, NULL, 0); - return this->Read(""); + CloseSocket(this->WritePipe); } -bool Pipe::Read(const Anope::string &) +bool Pipe::ProcessRead() { + char dummy[512]; + while (recv(this->GetFD(), dummy, 512, 0) == 512); this->OnNotify(); return true; } void Pipe::Notify() { - this->IO->Send(this, ""); + const char dummy = '*'; + send(this->WritePipe, &dummy, 1, 0); } void Pipe::OnNotify() diff --git a/src/socketengines/socketengine_epoll.cpp b/src/socketengines/socketengine_epoll.cpp index ad9bef3ce..73f7e10d6 100644 --- a/src/socketengines/socketengine_epoll.cpp +++ b/src/socketengines/socketengine_epoll.cpp @@ -11,18 +11,12 @@ void SocketEngine::Init() max = ulimit(4, 0); if (max <= 0) - { - Log() << "Can't determine maximum number of open sockets"; - throw CoreException("Can't determine maximum number of open sockets"); - } + throw SocketException("Can't determine maximum number of open sockets"); EngineHandle = epoll_create(max / 4); if (EngineHandle == -1) - { - Log() << "Could not initialize epoll socket engine: " << Anope::LastError(); - throw CoreException(Anope::string("Could not initialize epoll socket engine: ") + Anope::LastError()); - } + throw SocketException("Could not initialize epoll socket engine: " + Anope::LastError()); events = new epoll_event[max]; memset(events, 0, sizeof(epoll_event) * max); @@ -53,10 +47,7 @@ void SocketEngine::AddSocket(Socket *s) ev.data.fd = s->GetFD(); if (epoll_ctl(EngineHandle, EPOLL_CTL_ADD, ev.data.fd, &ev) == -1) - { - Log() << "Unable to add fd " << ev.data.fd << " to socketengine epoll: " << Anope::LastError(); - return; - } + throw SocketException("Unable to add fd " + stringify(ev.data.fd) + " to epoll: " + Anope::LastError()); Sockets.insert(std::make_pair(ev.data.fd, s)); } @@ -70,10 +61,7 @@ void SocketEngine::DelSocket(Socket *s) ev.data.fd = s->GetFD(); if (epoll_ctl(EngineHandle, EPOLL_CTL_DEL, ev.data.fd, &ev) == -1) - { - Log() << "Unable to delete fd " << ev.data.fd << " from socketengine epoll: " << Anope::LastError(); - return; - } + throw SocketException("Unable to remove fd " + stringify(ev.data.fd) + " from epoll: " + Anope::LastError()); Sockets.erase(ev.data.fd); } @@ -91,9 +79,9 @@ void SocketEngine::MarkWritable(Socket *s) ev.data.fd = s->GetFD(); if (epoll_ctl(EngineHandle, EPOLL_CTL_MOD, ev.data.fd, &ev) == -1) - Log() << "Unable to mark fd " << ev.data.fd << " as writable in socketengine epoll: " << Anope::LastError(); - else - s->SetFlag(SF_WRITABLE); + throw SocketException("Unable to mark fd " + stringify(ev.data.fd) + " as writable in epoll: " + Anope::LastError()); + + s->SetFlag(SF_WRITABLE); } void SocketEngine::ClearWritable(Socket *s) @@ -109,9 +97,9 @@ void SocketEngine::ClearWritable(Socket *s) ev.data.fd = s->GetFD(); if (epoll_ctl(EngineHandle, EPOLL_CTL_MOD, ev.data.fd, &ev) == -1) - Log() << "Unable to mark fd " << ev.data.fd << " as unwritable in socketengine epoll: " << Anope::LastError(); - else - s->UnsetFlag(SF_WRITABLE); + throw SocketException("Unable clear mark fd " + stringify(ev.data.fd) + " as writable in epoll: " + Anope::LastError()); + + s->UnsetFlag(SF_WRITABLE); } void SocketEngine::Process() @@ -130,10 +118,15 @@ void SocketEngine::Process() for (int i = 0; i < total; ++i) { epoll_event *ev = &events[i]; - Socket *s = Sockets[ev->data.fd]; + + std::map<int, Socket *>::iterator it = Sockets.find(ev->data.fd); + if (it == Sockets.end()) + continue; + Socket *s = it->second; if (s->HasFlag(SF_DEAD)) continue; + if (ev->events & (EPOLLHUP | EPOLLERR)) { s->ProcessError(); @@ -141,6 +134,9 @@ void SocketEngine::Process() continue; } + if (!s->Process()) + continue; + if ((ev->events & EPOLLIN) && !s->ProcessRead()) s->SetFlag(SF_DEAD); @@ -151,7 +147,10 @@ void SocketEngine::Process() for (int i = 0; i < total; ++i) { epoll_event *ev = &events[i]; - Socket *s = Sockets[ev->data.fd]; + std::map<int, Socket *>::iterator it = Sockets.find(ev->data.fd); + if (it == Sockets.end()) + continue; + Socket *s = it->second; if (s->HasFlag(SF_DEAD)) delete s; diff --git a/src/socketengines/socketengine_poll.cpp b/src/socketengines/socketengine_poll.cpp index 278485fa7..ba111659a 100644 --- a/src/socketengines/socketengine_poll.cpp +++ b/src/socketengines/socketengine_poll.cpp @@ -50,10 +50,7 @@ void SocketEngine::Shutdown() void SocketEngine::AddSocket(Socket *s) { if (SocketCount == max) - { - Log() << "Unable to add fd " << s->GetFD() << " to socketengine poll, engine is full"; - return; - } + throw SocketException("Unable to add fd " + stringify(s->GetFD()) + " to poll: " + Anope::LastError()); pollfd *ev = &events[SocketCount]; ev->fd = s->GetFD(); @@ -70,10 +67,7 @@ void SocketEngine::DelSocket(Socket *s) { std::map<int, int>::iterator pos = socket_positions.find(s->GetFD()); if (pos == socket_positions.end()) - { - Log() << "Unable to delete unknown fd " << s->GetFD() << " from socketengine poll"; - return; - } + throw SocketException("Unable to remove fd " + stringify(s->GetFD()) + " from poll"); if (pos->second != SocketCount - 1) { @@ -100,10 +94,7 @@ void SocketEngine::MarkWritable(Socket *s) std::map<int, int>::iterator pos = socket_positions.find(s->GetFD()); if (pos == socket_positions.end()) - { - Log() << "Unable to mark unknown fd " << s->GetFD() << " as writable"; - return; - } + throw SocketException("Unable to mark fd " + stringify(s->GetFD()) + " as writable in poll"); pollfd *ev = &events[pos->second]; ev->events |= POLLOUT; @@ -118,10 +109,7 @@ void SocketEngine::ClearWritable(Socket *s) std::map<int, int>::iterator pos = socket_positions.find(s->GetFD()); if (pos == socket_positions.end()) - { - Log() << "Unable to mark unknown fd " << s->GetFD() << " as writable"; - return; - } + throw SocketException("Unable clear mark fd " + stringify(s->GetFD()) + " as writable in poll"); pollfd *ev = &events[pos->second]; ev->events &= ~POLLOUT; @@ -149,10 +137,14 @@ void SocketEngine::Process() if (ev->revents != 0) ++processed; - Socket *s = Sockets[ev->fd]; + std::map<int, Socket *>::iterator it = Sockets.find(ev->fd); + if (it == Sockets.end()) + continue; + Socket *s = it->second; if (s->HasFlag(SF_DEAD)) continue; + if (ev->revents & (POLLERR | POLLRDHUP)) { s->ProcessError(); @@ -160,6 +152,9 @@ void SocketEngine::Process() continue; } + if (!s->Process()) + continue; + if ((ev->revents & POLLIN) && !s->ProcessRead()) s->SetFlag(SF_DEAD); @@ -170,7 +165,10 @@ void SocketEngine::Process() for (int i = 0; i < SocketCount; ++i) { pollfd *ev = &events[i]; - Socket *s = Sockets[ev->fd]; + std::map<int, Socket *>::iterator it = Sockets.find(ev->fd); + if (it == Sockets.end()) + continue; + Socket *s = it->second; if (s->HasFlag(SF_DEAD)) delete s; diff --git a/src/socketengines/socketengine_select.cpp b/src/socketengines/socketengine_select.cpp index 34a86f680..ad6e04ca9 100644 --- a/src/socketengines/socketengine_select.cpp +++ b/src/socketengines/socketengine_select.cpp @@ -96,19 +96,27 @@ void SocketEngine::Process() { Socket *s = it->second; - if (FD_ISSET(s->GetFD(), &efdset) || FD_ISSET(s->GetFD(), &rfdset) || FD_ISSET(s->GetFD(), &wfdset)) + bool has_read = FD_ISSET(s->GetFD(), &rfdset), has_write = FD_ISSET(s->GetFD(), &wfdset), has_error = FD_ISSET(s->GetFD(), &efdset); + if (has_read || has_write || has_error) ++processed; + if (s->HasFlag(SF_DEAD)) continue; - if (FD_ISSET(s->GetFD(), &efdset)) + + if (has_error) { s->ProcessError(); s->SetFlag(SF_DEAD); continue; } - if (FD_ISSET(s->GetFD(), &rfdset) && !s->ProcessRead()) + + if (!s->Process()) + continue; + + if (has_read && !s->ProcessRead()) s->SetFlag(SF_DEAD); - if (FD_ISSET(s->GetFD(), &wfdset) && !s->ProcessWrite()) + + if (has_write && !s->ProcessWrite()) s->SetFlag(SF_DEAD); } diff --git a/src/sockets.cpp b/src/sockets.cpp index b7f0a5f9a..541e3bb3a 100644 --- a/src/sockets.cpp +++ b/src/sockets.cpp @@ -260,15 +260,19 @@ int SocketIO::Recv(Socket *s, char *buf, size_t sz) /** Write something to the socket * @param s The socket - * @param buf What to write - * @return Number of bytes written + * @param buf The data to write + * @param size The length of the data */ -int SocketIO::Send(Socket *s, const Anope::string &buf) +int SocketIO::Send(Socket *s, const char *buf, size_t sz) { - size_t i = send(s->GetFD(), buf.c_str(), buf.length(), 0); + size_t i = send(s->GetFD(), buf, sz, 0); TotalWritten += i; return i; } +int SocketIO::Send(Socket *s, const Anope::string &buf) +{ + return this->Send(s, buf.c_str(), buf.length()); +} /** Accept a connection from a socket * @param s The socket @@ -282,22 +286,27 @@ ClientSocket *SocketIO::Accept(ListenSocket *s) int newsock = accept(s->GetFD(), &conaddr.sa, &size); #ifndef INVALID_SOCKET -# define INVALID_SOCKET -1 + static const int INVALID_SOCKET = -1; #endif if (newsock >= 0 && newsock != INVALID_SOCKET) - return s->OnAccept(newsock, conaddr); + { + ClientSocket *ns = s->OnAccept(newsock, conaddr); + ns->SetFlag(SF_ACCEPTED); + ns->OnAccept(); + return ns; + } else throw SocketException("Unable to accept connection: " + Anope::LastError()); } -/** Check if a connection has been accepted - * @param s The client socket - * @return -1 on error, 0 to wait, 1 on success +/** Finished accepting a connection from a socket + * @param s The socket + * @return SF_ACCEPTED if accepted, SF_ACCEPTING if still in process, SF_DEAD on error */ -int SocketIO::Accepted(ClientSocket *cs) +SocketFlag SocketIO::FinishAccept(ClientSocket *cs) { - return 1; + return SF_ACCEPTED; } /** Bind a socket @@ -319,6 +328,8 @@ void SocketIO::Bind(Socket *s, const Anope::string &ip, int port) */ void SocketIO::Connect(ConnectionSocket *s, const Anope::string &target, int port) { + s->UnsetFlag(SF_CONNECTING); + s->UnsetFlag(SF_CONNECTED); s->conaddr.pton(s->IsIPv6() ? AF_INET6 : AF_INET, target, port); int c = connect(s->GetFD(), &s->conaddr.sa, s->conaddr.size()); if (c == -1) @@ -326,30 +337,51 @@ void SocketIO::Connect(ConnectionSocket *s, const Anope::string &target, int por if (Anope::LastErrorCode() != EINPROGRESS) s->OnError(Anope::LastError()); else + { SocketEngine::MarkWritable(s); + s->SetFlag(SF_CONNECTING); + } } else { - s->connected = true; + s->SetFlag(SF_CONNECTED); s->OnConnect(); } } -/** Check if this socket is connected +/** Called to potentially finish a pending connection * @param s The socket - * @return -1 for error, 0 for wait, 1 for connected + * @return SF_CONNECTED on success, SF_CONNECTING if still pending, and SF_DEAD on error. */ -int SocketIO::Connected(ConnectionSocket *s) +SocketFlag SocketIO::FinishConnect(ConnectionSocket *s) { - return s->connected == true ? 1 : -1; + if (s->HasFlag(SF_CONNECTED)) + return SF_CONNECTED; + else if (!s->HasFlag(SF_CONNECTING)) + throw SocketException("SocketIO::FinishConnect called for a socket not connected nor connecting?"); + + int optval = 0; + socklen_t optlen = sizeof(optval); + if (!getsockopt(s->GetFD(), SOL_SOCKET, SO_ERROR, reinterpret_cast<char *>(&optval), &optlen) && !optval) + { + s->SetFlag(SF_CONNECTED); + s->UnsetFlag(SF_CONNECTING); + s->OnConnect(); + return SF_CONNECTED; + } + else + { + errno = optval; + s->ProcessError(); + return SF_DEAD; + } } -/** Empty constructor, used for things such as the pipe socket +/** Empty constructor, should not be called. */ -Socket::Socket() : Flags<SocketFlag, 2>(SocketFlagStrings) +Socket::Socket() : Flags<SocketFlag>(SocketFlagStrings) { - this->Type = SOCKTYPE_BASE; - this->IO = &normalSocketIO; + throw CoreException("Socket::Socket() ?"); } /** Constructor @@ -357,12 +389,11 @@ Socket::Socket() : Flags<SocketFlag, 2>(SocketFlagStrings) * @param ipv6 IPv6? * @param type The socket type, defaults to SOCK_STREAM */ -Socket::Socket(int sock, bool ipv6, int type) : Flags<SocketFlag, 2>(SocketFlagStrings) +Socket::Socket(int sock, bool ipv6, int type) : Flags<SocketFlag>(SocketFlagStrings) { - this->Type = SOCKTYPE_BASE; this->IO = &normalSocketIO; this->IPv6 = ipv6; - if (sock == 0) + if (sock == -1) this->Sock = socket(this->IPv6 ? AF_INET6 : AF_INET, type, 0); else this->Sock = sock; @@ -432,163 +463,35 @@ void Socket::Bind(const Anope::string &ip, int port) this->IO->Bind(this, ip, port); } -/** Called when there is something to be received for this socket - * @return true on success, false to drop this socket +/** Called when there either is a read or write event. + * @return true to continue to call ProcessRead/ProcessWrite, false to not continue */ -bool Socket::ProcessRead() +bool Socket::Process() { return true; } -/** Called when the socket is ready to be written to - * @return true on success, false to drop this socket - */ -bool Socket::ProcessWrite() -{ - return true; -} - -/** Called when there is an error for this socket - * @return true on success, false to drop this socket - */ -void Socket::ProcessError() -{ -} - -/** Constructor for pipe socket - */ -BufferedSocket::BufferedSocket() : Socket() -{ - this->Type = SOCKTYPE_BUFFERED; -} - -/** Constructor - * @param fd FD to use - * @param ipv6 true for ipv6 - * @param type socket type, defaults to SOCK_STREAM - */ -BufferedSocket::BufferedSocket(int fd, bool ipv6, int type) : Socket(fd, ipv6, type) -{ - this->Type = SOCKTYPE_BUFFERED; -} - -/** Default destructor - */ -BufferedSocket::~BufferedSocket() -{ -} - /** Called when there is something to be received for this socket * @return true on success, false to drop this socket */ -bool BufferedSocket::ProcessRead() +bool Socket::ProcessRead() { - char tbuffer[NET_BUFSIZE]; - - this->RecvLen = 0; - - int len = this->IO->Recv(this, tbuffer, sizeof(tbuffer) - 1); - if (len <= 0) - return false; - - tbuffer[len] = 0; - this->RecvLen = len; - - Anope::string sbuffer = this->extrabuf; - sbuffer += tbuffer; - this->extrabuf.clear(); - size_t lastnewline = sbuffer.rfind('\n'); - if (lastnewline == Anope::string::npos) - { - this->extrabuf = sbuffer; - return true; - } - if (lastnewline < sbuffer.length() - 1) - { - this->extrabuf = sbuffer.substr(lastnewline); - this->extrabuf.trim(); - sbuffer = sbuffer.substr(0, lastnewline); - } - - sepstream stream(sbuffer, '\n'); - - Anope::string tbuf; - while (stream.GetToken(tbuf)) - { - tbuf.trim(); - if (!tbuf.empty() && !Read(tbuf)) - return false; - } - return true; } /** Called when the socket is ready to be written to * @return true on success, false to drop this socket */ -bool BufferedSocket::ProcessWrite() +bool Socket::ProcessWrite() { - int count = this->IO->Send(this, this->WriteBuffer); - if (count <= -1) - return false; - this->WriteBuffer = this->WriteBuffer.substr(count); - if (this->WriteBuffer.empty()) - SocketEngine::ClearWritable(this); - return true; } -/** Called with a line received from the socket - * @param buf The line - * @return true to continue reading, false to drop the socket - */ -bool BufferedSocket::Read(const Anope::string &buf) -{ - return false; -} - -/** Write to the socket - * @param message The message - */ -void BufferedSocket::Write(const char *message, ...) -{ - va_list vi; - char tbuffer[BUFSIZE]; - - if (!message) - return; - - va_start(vi, message); - vsnprintf(tbuffer, sizeof(tbuffer), message, vi); - va_end(vi); - - Anope::string sbuf = tbuffer; - Write(sbuf); -} - -/** Write to the socket - * @param message The message - */ -void BufferedSocket::Write(const Anope::string &message) -{ - this->WriteBuffer += message + "\r\n"; - SocketEngine::MarkWritable(this); -} - -/** Get the length of the read buffer - * @return The length of the read buffer - */ -int BufferedSocket::ReadBufferLen() const -{ - return RecvLen; -} - -/** Get the length of the write buffer - * @return The length of the write buffer +/** Called when there is an error for this socket + * @return true on success, false to drop this socket */ -int BufferedSocket::WriteBufferLen() const +void Socket::ProcessError() { - return this->WriteBuffer.length(); } /** Constructor @@ -596,16 +499,20 @@ int BufferedSocket::WriteBufferLen() const * @param port The port to listen on * @param ipv6 true for ipv6 */ -ListenSocket::ListenSocket(const Anope::string &bindip, int port, bool ipv6) : Socket(0, ipv6) +ListenSocket::ListenSocket(const Anope::string &bindip, int port, bool ipv6) : Socket(-1, ipv6) { - this->Type = SOCKTYPE_LISTEN; this->SetNonBlocking(); +#ifndef _WIN32 + int op = 1; + setsockopt(this->GetFD(), SOL_SOCKET, SO_REUSEADDR, &op, sizeof(op)); +#endif + this->bindaddr.pton(IPv6 ? AF_INET6 : AF_INET, bindip, port); this->IO->Bind(this, bindip, port); if (listen(Sock, SOMAXCONN) == -1) - throw SocketException(Anope::string("Unable to listen: ") + Anope::LastError()); + throw SocketException("Unable to listen: " + Anope::LastError()); } /** Destructor @@ -629,148 +536,3 @@ bool ListenSocket::ProcessRead() return true; } -/** Called when a connection is accepted - * @param fd The FD for the new connection - * @param addr The sockaddr for where the connection came from - * @return The new socket - */ -ClientSocket *ListenSocket::OnAccept(int fd, const sockaddrs &addr) -{ - return new ClientSocket(this, fd, addr); -} - -/** Constructor - * @param ipv6 true to use IPv6 - * @param type The socket type, defaults to SOCK_STREAM - */ -ConnectionSocket::ConnectionSocket(bool ipv6, int type) : BufferedSocket(0, ipv6, type), connected(false) -{ - this->Type = SOCKTYPE_CONNECTION; -} - -/** Connect the socket - * @param TargetHost The target host to connect to - * @param Port The target port to connect to - */ -void ConnectionSocket::Connect(const Anope::string &TargetHost, int Port) -{ - this->IO->Connect(this, TargetHost, Port); -} - -/** Called when there is something to be received for this socket - * @return true on success, false to drop this socket - */ -bool ConnectionSocket::ProcessRead() -{ - if (!this->connected) - { - int optval = 0; - socklen_t optlen = sizeof(optval); - if (!getsockopt(this->GetFD(), SOL_SOCKET, SO_ERROR, reinterpret_cast<char *>(&optval), &optlen) && !optval) - { - this->connected = true; - this->OnConnect(); - } - else - errno = optval; - } - - int i = this->IO->Connected(this); - if (i == 1) - return BufferedSocket::ProcessRead(); - else if (i == 0) - return true; - - this->OnError(Anope::LastError()); - return false; -} - -/** Called when the socket is ready to be written to - * @return true on success, false to drop this socket - */ -bool ConnectionSocket::ProcessWrite() -{ - if (!this->connected) - { - int optval = 0; - socklen_t optlen = sizeof(optval); - if (!getsockopt(this->GetFD(), SOL_SOCKET, SO_ERROR, reinterpret_cast<char *>(&optval), &optlen) && !optval) - { - this->connected = true; - this->OnConnect(); - } - else - errno = optval; - } - - int i = this->IO->Connected(this); - if (i == 1) - return BufferedSocket::ProcessWrite(); - else if (i == 0) - return true; - - this->OnError(Anope::LastError()); - return false; -} - -/** Called when there is an error for this socket - * @return true on success, false to drop this socket - */ -void ConnectionSocket::ProcessError() -{ - int optval = 0; - socklen_t optlen = sizeof(optval); - getsockopt(this->GetFD(), SOL_SOCKET, SO_ERROR, reinterpret_cast<char *>(&optval), &optlen); - errno = optval; - this->OnError(optval ? Anope::LastError() : ""); -} - -/** Called on a successful connect - */ -void ConnectionSocket::OnConnect() -{ -} - -/** Called when a connection is not successful - * @param error The error - */ -void ConnectionSocket::OnError(const Anope::string &error) -{ -} - -/** Constructor - * @param ls Listen socket this connection is from - * @param fd New FD for this socket - * @param addr Address the connection came from - */ -ClientSocket::ClientSocket(ListenSocket *ls, int fd, const sockaddrs &addr) : BufferedSocket(fd, ls->IsIPv6()), LS(ls), clientaddr(addr) -{ - this->Type = SOCKTYPE_CLIENT; -} - -/** Called when there is something to be received for this socket - * @return true on success, false to drop this socket - */ -bool ClientSocket::ProcessRead() -{ - int i = this->IO->Accepted(this); - if (i == 1) - return BufferedSocket::ProcessRead(); - else if (i == 0) - return true; - return false; -} - -/** Called when the socket is ready to be written to - * @return true on success, false to drop this socket - */ -bool ClientSocket::ProcessWrite() -{ - int i = this->IO->Accepted(this); - if (i == 1) - return BufferedSocket::ProcessWrite(); - else if (i == 0) - return true; - return false; -} - |