summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--include/extern.h11
-rw-r--r--include/sockets.h169
-rw-r--r--modules/extra/m_dnsbl.cpp13
-rw-r--r--modules/extra/m_ssl.cpp202
-rw-r--r--modules/extra/m_xmlrpc.cpp6
-rw-r--r--modules/extra/xmlrpc.h4
-rw-r--r--src/main.cpp103
-rw-r--r--src/socket_clients.cpp96
-rw-r--r--src/socket_transport.cpp185
-rw-r--r--src/socketengines/pipeengine_eventfd.cpp49
-rw-r--r--src/socketengines/pipeengine_pipe.cpp49
-rw-r--r--src/socketengines/pipeengine_win32.cpp83
-rw-r--r--src/socketengines/socketengine_epoll.cpp47
-rw-r--r--src/socketengines/socketengine_poll.cpp34
-rw-r--r--src/socketengines/socketengine_select.cpp16
-rw-r--r--src/sockets.cpp376
16 files changed, 728 insertions, 715 deletions
diff --git a/include/extern.h b/include/extern.h
index 0b90c6e14..1772c965c 100644
--- a/include/extern.h
+++ b/include/extern.h
@@ -106,7 +106,16 @@ E bool restarting;
E Anope::string quitmsg;
E time_t start_time;
-E ConnectionSocket *UplinkSock;
+class UplinkSocket : public ConnectionSocket, public BufferedSocket
+{
+ public:
+ UplinkSocket();
+ ~UplinkSocket();
+ bool Read(const Anope::string &);
+ void OnConnect();
+ void OnError(const Anope::string &);
+};
+E UplinkSocket *UplinkSock;
E int CurrentUplink;
E void save_databases();
diff --git a/include/sockets.h b/include/sockets.h
index eb86130b5..b8d6033ee 100644
--- a/include/sockets.h
+++ b/include/sockets.h
@@ -106,22 +106,17 @@ class SocketException : public CoreException
virtual ~SocketException() throw() { }
};
-enum SocketType
-{
- SOCKTYPE_BASE,
- SOCKTYPE_BUFFERED,
- SOCKTYPE_CONNECTION,
- SOCKTYPE_CLIENT,
- SOCKTYPE_LISTEN
-};
-
enum SocketFlag
{
SF_DEAD,
- SF_WRITABLE
+ SF_WRITABLE,
+ SF_CONNECTING,
+ SF_CONNECTED,
+ SF_ACCEPTING,
+ SF_ACCEPTED
};
-static const Anope::string SocketFlagStrings[] = { "SF_DEAD", "SF_WRITABLE", "" };
+static const Anope::string SocketFlagStrings[] = { "SF_DEAD", "SF_WRITABLE", "SF_CONNECTING", "SF_CONNECTED", "SF_ACCEPTING", "SF_ACCEPTED", "" };
class Socket;
class ClientSocket;
@@ -140,11 +135,12 @@ class CoreExport SocketIO
virtual int Recv(Socket *s, char *buf, size_t sz);
/** Write something to the socket
- * @param s The socket
- * @param buf What to write
- * @return Number of bytes written
+ * @param s The socket
+ * @param buf The data to write
+ * @param size The length of the data
*/
- virtual int Send(Socket *s, const Anope::string &buf);
+ virtual int Send(Socket *s, const char *buf, size_t sz);
+ int Send(Socket *s, const Anope::string &buf);
/** Accept a connection from a socket
* @param s The socket
@@ -152,11 +148,11 @@ class CoreExport SocketIO
*/
virtual ClientSocket *Accept(ListenSocket *s);
- /** Check if a connection has been accepted
- * @param s The client socket
- * @return -1 on error, 0 to wait, 1 on success
+ /** Finished accepting a connection from a socket
+ * @param s The socket
+ * @return SF_ACCEPTED if accepted, SF_ACCEPTING if still in process, SF_DEAD on error
*/
- virtual int Accepted(ClientSocket *cs);
+ virtual SocketFlag FinishAccept(ClientSocket *cs);
/** Bind a socket
* @param s The socket
@@ -172,18 +168,18 @@ class CoreExport SocketIO
*/
virtual void Connect(ConnectionSocket *s, const Anope::string &target, int port);
- /** Check if this socket is connected
+ /** Called to potentially finish a pending connection
* @param s The socket
- * @return -1 for error, 0 for wait, 1 for connected
+ * @return SF_CONNECTED on success, SF_CONNECTING if still pending, and SF_DEAD on error.
*/
- virtual int Connected(ConnectionSocket *s);
+ virtual SocketFlag FinishConnect(ConnectionSocket *s);
/** Called when the socket is destructing
*/
virtual void Destroy() { }
};
-class CoreExport Socket : public Flags<SocketFlag, 2>
+class CoreExport Socket : public Flags<SocketFlag>
{
protected:
/* Socket FD */
@@ -198,19 +194,16 @@ class CoreExport Socket : public Flags<SocketFlag, 2>
/* I/O functions used for this socket */
SocketIO *IO;
- /* Type this socket is */
- SocketType Type;
-
- /** Empty constructor, used for things such as the pipe socket
+ /** Empty constructor, should not be called.
*/
Socket();
/** Default constructor
- * @param sock The socket to use, 0 if we need to create our own
+ * @param sock The socket to use, -1 if we need to create our own
* @param ipv6 true if using ipv6
* @param type The socket type, defaults to SOCK_STREAM
*/
- Socket(int sock, bool ipv6, int type = SOCK_STREAM);
+ Socket(int sock, bool ipv6 = false, int type = SOCK_STREAM);
/** Default destructor
*/
@@ -242,6 +235,11 @@ class CoreExport Socket : public Flags<SocketFlag, 2>
*/
void Bind(const Anope::string &ip, int port = 0);
+ /** Called when there either is a read or write event.
+ * @return true to continue to call ProcessRead/ProcessWrite, false to not continue
+ */
+ virtual bool Process();
+
/** Called when there is something to be received for this socket
* @return true on success, false to drop this socket
*/
@@ -258,7 +256,7 @@ class CoreExport Socket : public Flags<SocketFlag, 2>
virtual void ProcessError();
};
-class CoreExport BufferedSocket : public Socket
+class CoreExport BufferedSocket : public virtual Socket
{
protected:
/* Things to be written to the socket */
@@ -269,16 +267,9 @@ class CoreExport BufferedSocket : public Socket
int RecvLen;
public:
- /** Blank constructor
- */
- BufferedSocket();
-
/** Constructor
- * @param fd FD to use
- * @param ipv6 true for ipv6
- * @param type socket type, defaults to SOCK_STREAM
*/
- BufferedSocket(int fd, bool ipv6, int type = SOCK_STREAM);
+ BufferedSocket();
/** Default destructor
*/
@@ -317,6 +308,52 @@ class CoreExport BufferedSocket : public Socket
int WriteBufferLen() const;
};
+class CoreExport BinarySocket : public virtual Socket
+{
+ struct DataBlock
+ {
+ char *buf;
+ size_t len;
+
+ DataBlock(const char *b, size_t l);
+ ~DataBlock();
+ };
+
+ std::deque<DataBlock *> WriteBuffer;
+
+ public:
+ /** Constructor
+ */
+ BinarySocket();
+
+ /** Default destructor
+ */
+ virtual ~BinarySocket();
+
+ /** Called when there is something to be received for this socket
+ * @return true on success, false to drop this socket
+ */
+ bool ProcessRead();
+
+ /** Called when the socket is ready to be written to
+ * @return true on success, false to drop this socket
+ */
+ bool ProcessWrite();
+
+ /** Write data to the socket
+ * @param buffer The data to write
+ * @param l The length of the data
+ */
+ void Write(const char *buffer, size_t l);
+
+ /** Called with data from the socket
+ * @param buffer The data
+ * @param l The length of buffer
+ * @return true to continue reading, false to drop the socket
+ */
+ virtual bool Read(const char *buffer, size_t l);
+};
+
class CoreExport ListenSocket : public Socket
{
public:
@@ -341,22 +378,18 @@ class CoreExport ListenSocket : public Socket
* @param addr The sockaddr for where the connection came from
* @return The new socket
*/
- virtual ClientSocket *OnAccept(int fd, const sockaddrs &addr);
+ virtual ClientSocket *OnAccept(int fd, const sockaddrs &addr) = 0;
};
-class CoreExport ConnectionSocket : public BufferedSocket
+class CoreExport ConnectionSocket : public virtual Socket
{
public:
/* Sockaddrs for connection ip/port */
sockaddrs conaddr;
- /* True if connected */
- bool connected;
/** Constructor
- * @param ipv6 true to use IPv6
- * @param type The socket type, defaults to SOCK_STREAM
*/
- ConnectionSocket(bool ipv6 = false, int type = SOCK_STREAM);
+ ConnectionSocket();
/** Connect the socket
* @param TargetHost The target host to connect to
@@ -364,15 +397,11 @@ class CoreExport ConnectionSocket : public BufferedSocket
*/
void Connect(const Anope::string &TargetHost, int Port);
- /** Called when there is something to be received for this socket
- * @return true on success, false to drop this socket
- */
- bool ProcessRead();
-
- /** Called when the socket is ready to be written to
- * @return true on success, false to drop this socket
+ /** Called when there either is a read or write event.
+ * Used to determine whether or not this socket is connected yet.
+ * @return true to continue to call ProcessRead/ProcessWrite, false to not continue
*/
- bool ProcessWrite();
+ bool Process();
/** Called when there is an error for this socket
* @return true on success, false to drop this socket
@@ -389,7 +418,7 @@ class CoreExport ConnectionSocket : public BufferedSocket
virtual void OnError(const Anope::string &error);
};
-class CoreExport ClientSocket : public BufferedSocket
+class CoreExport ClientSocket : public virtual Socket
{
public:
/* Listen socket this connection came from */
@@ -399,23 +428,31 @@ class CoreExport ClientSocket : public BufferedSocket
/** Constructor
* @param ls Listen socket this connection is from
- * @param fd New FD for this socket
* @param addr Address the connection came from
*/
- ClientSocket(ListenSocket *ls, int fd, const sockaddrs &addr);
+ ClientSocket(ListenSocket *ls, const sockaddrs &addr);
- /** Called when there is something to be received for this socket
- * @return true on success, false to drop this socket
+ /** Called when there either is a read or write event.
+ * Used to determine whether or not this socket is connected yet.
+ * @return true to continue to call ProcessRead/ProcessWrite, false to not continue
*/
- bool ProcessRead();
+ bool Process();
- /** Called when the socket is ready to be written to
+ /** Called when there is an error for this socket
* @return true on success, false to drop this socket
*/
- bool ProcessWrite();
+ void ProcessError();
+
+ /** Called when a client has been accepted() successfully.
+ */
+ virtual void OnAccept();
+
+ /** Called when there was an error accepting the client
+ */
+ virtual void OnError(const Anope::string &error);
};
-class CoreExport Pipe : public BufferedSocket
+class CoreExport Pipe : public Socket
{
public:
/** The FD of the write pipe (if this isn't evenfd)
@@ -427,13 +464,13 @@ class CoreExport Pipe : public BufferedSocket
*/
Pipe();
- /** Called when data is to be read
+ /** Destructor
*/
- bool ProcessRead();
+ ~Pipe();
- /** Function that calls OnNotify
+ /** Called when data is to be read
*/
- bool Read(const Anope::string &);
+ bool ProcessRead();
/** Called when this pipe needs to be woken up
*/
diff --git a/modules/extra/m_dnsbl.cpp b/modules/extra/m_dnsbl.cpp
index f62908b39..7e5342d3c 100644
--- a/modules/extra/m_dnsbl.cpp
+++ b/modules/extra/m_dnsbl.cpp
@@ -56,20 +56,19 @@ class DNSBLResolver : public DNSRequest
reason = reason.replace_all_cs("%r", record_reason);
reason = reason.replace_all_cs("%N", Config->NetworkName);
- XLine *x = NULL;
BotInfo *operserv = findbot(Config->OperServ);
- if (this->add_to_akill && akills && (x = akills->Add(Anope::string("*@") + user->host, Config->OperServ, Anope::CurTime + this->blacklist.bantime, reason)))
+ Log(operserv) << "DNSBL: " << user->GetMask() << " appears in " << this->blacklist.name;
+ if (this->add_to_akill && akills)
{
- Log(operserv) << "DNSBL: " << user->GetMask() << " appears in " << this->blacklist.name;
+ XLine *x = akills->Add("*@" + user->host, Config->OperServ, Anope::CurTime + this->blacklist.bantime, reason);
/* If AkillOnAdd is disabled send it anyway, noone wants bots around... */
if (!Config->AkillOnAdd)
- ircdproto->SendAkill(user, x);
+ akills->Send(NULL, x);
}
else
{
- Log(operserv) << "DNSBL: " << user->GetMask() << " appears in " << this->blacklist.name << "(" << reason << ")";
- XLine xline(Anope::string("*@") + user->host, Config->OperServ, Anope::CurTime + this->blacklist.bantime, reason);
- ircdproto->SendAkill(user, &xline);
+ XLine xline("*@" + user->host, Config->OperServ, Anope::CurTime + this->blacklist.bantime, reason);
+ ircdproto->SendAkill(NULL, &xline);
}
}
};
diff --git a/modules/extra/m_ssl.cpp b/modules/extra/m_ssl.cpp
index 41bc49993..5143d964a 100644
--- a/modules/extra/m_ssl.cpp
+++ b/modules/extra/m_ssl.cpp
@@ -31,8 +31,6 @@ class SSLSocketIO : public SocketIO
public:
/* The SSL socket for this socket */
SSL *sslsock;
- /* -1 if not, 0 if waiting, 1 if true */
- int connected, accepted;
/** Constructor
*/
@@ -46,12 +44,12 @@ class SSLSocketIO : public SocketIO
*/
int Recv(Socket *s, char *buf, size_t sz);
- /** Really write something to the socket
- * @param s The socket
- * @param buf What to write
- * @return Number of bytes written
+ /** Write something to the socket
+ * @param s The socket
+ * @param buf The data to write
+ * @param size The length of the data
*/
- int Send(Socket *s, const Anope::string &buf);
+ int Send(Socket *s, const char *buf, size_t sz);
/** Accept a connection from a socket
* @param s The socket
@@ -59,11 +57,11 @@ class SSLSocketIO : public SocketIO
*/
ClientSocket *Accept(ListenSocket *s);
- /** Check if a connection has been accepted
- * @param s The client socket
- * @return -1 on error, 0 to wait, 1 on success
+ /** Finished accepting a connection from a socket
+ * @param s The socket
+ * @return SF_ACCEPTED if accepted, SF_ACCEPTING if still in process, SF_DEAD on error
*/
- int Accepted(ClientSocket *cs);
+ SocketFlag FinishAccept(ClientSocket *cs);
/** Connect the socket
* @param s THe socket
@@ -72,11 +70,11 @@ class SSLSocketIO : public SocketIO
*/
void Connect(ConnectionSocket *s, const Anope::string &target, int port);
- /** Check if this socket is connected
+ /** Called to potentially finish a pending connection
* @param s The socket
- * @return -1 for error, 0 for wait, 1 for connected
+ * @return SF_CONNECTED on success, SF_CONNECTING if still pending, and SF_DEAD on error.
*/
- int Connected(ConnectionSocket *s);
+ SocketFlag FinishConnect(ConnectionSocket *s);
/** Called when the socket is destructing
*/
@@ -194,7 +192,7 @@ void MySSLService::Init(Socket *s)
s->IO = new SSLSocketIO();
}
-SSLSocketIO::SSLSocketIO() : connected(-1), accepted(-1)
+SSLSocketIO::SSLSocketIO()
{
this->sslsock = NULL;
}
@@ -206,9 +204,9 @@ int SSLSocketIO::Recv(Socket *s, char *buf, size_t sz)
return i;
}
-int SSLSocketIO::Send(Socket *s, const Anope::string &buf)
+int SSLSocketIO::Send(Socket *s, const char *buf, size_t sz)
{
- int i = SSL_write(this->sslsock, buf.c_str(), buf.length());
+ int i = SSL_write(this->sslsock, buf, sz);
TotalWritten += i;
return i;
}
@@ -217,8 +215,20 @@ ClientSocket *SSLSocketIO::Accept(ListenSocket *s)
{
if (s->IO == &normalSocketIO)
throw SocketException("Attempting to accept on uninitialized socket with SSL");
-
- ClientSocket *newsocket = normalSocketIO.Accept(s);
+
+ sockaddrs conaddr;
+
+ socklen_t size = sizeof(conaddr);
+ int newsock = accept(s->GetFD(), &conaddr.sa, &size);
+
+#ifndef INVALID_SOCKET
+ const int INVALID_SOCKET = -1;
+#endif
+
+ if (newsock < 0 || newsock == INVALID_SOCKET)
+ throw SocketException("Unable to accept connection: " + Anope::LastError());
+
+ ClientSocket *newsocket = s->OnAccept(newsock, conaddr);
me->service.Init(newsocket);
SSLSocketIO *IO = debug_cast<SSLSocketIO *>(newsocket->IO);
@@ -231,47 +241,47 @@ ClientSocket *SSLSocketIO::Accept(ListenSocket *s)
if (!SSL_set_fd(IO->sslsock, newsocket->GetFD()))
throw SocketException("Unable to set SSL fd");
- int ret = SSL_accept(IO->sslsock);
- if (ret <= 0)
- {
- IO->accepted = 0;
- int error = SSL_get_error(IO->sslsock, ret);
- if (ret == -1 && (error == SSL_ERROR_WANT_READ || error == SSL_ERROR_WANT_WRITE))
- {
- SocketEngine::MarkWritable(newsocket);
- return newsocket;
- }
-
- throw SocketException("Unable to accept new SSL connection: " + Anope::string(ERR_error_string(ERR_get_error(), NULL)));
- }
+ newsocket->SetFlag(SF_ACCEPTING);
+ this->FinishAccept(newsocket);
- IO->accepted = 1;
return newsocket;
}
-int SSLSocketIO::Accepted(ClientSocket *cs)
+SocketFlag SSLSocketIO::FinishAccept(ClientSocket *cs)
{
- SSLSocketIO *IO = debug_cast<SSLSocketIO *>(cs->IO);
+ if (cs->IO == &normalSocketIO)
+ throw SocketException("Attempting to finish connect uninitialized socket with SSL");
+ else if (cs->HasFlag(SF_ACCEPTED))
+ return SF_ACCEPTED;
+ else if (!cs->HasFlag(SF_ACCEPTING))
+ throw SocketException("SSLSocketIO::FinishAccept called for a socket not accepted nor accepting?");
- if (IO->accepted == 0)
+ SSLSocketIO *IO = debug_cast<SSLSocketIO *>(cs->IO);
+
+ int ret = SSL_accept(IO->sslsock);
+ if (ret <= 0)
{
- int ret = SSL_accept(IO->sslsock);
- if (ret <= 0)
+ int error = SSL_get_error(IO->sslsock, ret);
+ if (ret == -1 && (error == SSL_ERROR_WANT_READ || error == SSL_ERROR_WANT_WRITE))
{
- int error = SSL_get_error(IO->sslsock, ret);
- if (ret == -1 && (error == SSL_ERROR_WANT_READ || error == SSL_ERROR_WANT_WRITE))
- {
- SocketEngine::MarkWritable(cs);
- return 0;
- }
-
- return -1;
+ SocketEngine::MarkWritable(cs);
+ return SF_ACCEPTING;
+ }
+ else
+ {
+ cs->OnError(ERR_error_string(ERR_get_error(), NULL));
+ cs->SetFlag(SF_DEAD);
+ cs->UnsetFlag(SF_ACCEPTING);
+ return SF_DEAD;
}
- IO->accepted = 1;
- return 0;
}
-
- return IO->accepted;
+ else
+ {
+ cs->SetFlag(SF_ACCEPTED);
+ cs->UnsetFlag(SF_ACCEPTING);
+ cs->OnAccept();
+ return SF_ACCEPTED;
+ }
}
void SSLSocketIO::Connect(ConnectionSocket *s, const Anope::string &target, int port)
@@ -279,62 +289,78 @@ void SSLSocketIO::Connect(ConnectionSocket *s, const Anope::string &target, int
if (s->IO == &normalSocketIO)
throw SocketException("Attempting to connect uninitialized socket with SSL");
- normalSocketIO.Connect(s, target, port);
-
- SSLSocketIO *IO = debug_cast<SSLSocketIO *>(s->IO);
+ s->UnsetFlag(SF_CONNECTING);
+ s->UnsetFlag(SF_CONNECTED);
- IO->sslsock = SSL_new(client_ctx);
- if (!IO->sslsock)
- throw SocketException("Unable to initialize SSL socket");
-
- if (!SSL_set_fd(IO->sslsock, s->GetFD()))
- throw SocketException("Unable to set SSL fd");
-
- int ret = SSL_connect(IO->sslsock);
- if (ret <= 0)
+ s->conaddr.pton(s->IsIPv6() ? AF_INET6 : AF_INET, target, port);
+ int c = connect(s->GetFD(), &s->conaddr.sa, s->conaddr.size());
+ if (c == -1)
{
- IO->connected = 0;
- int error = SSL_get_error(IO->sslsock, ret);
- if (ret == -1 && (error == SSL_ERROR_WANT_READ || error == SSL_ERROR_WANT_WRITE))
+ if (Anope::LastErrorCode() != EINPROGRESS)
+ {
+ s->OnError(Anope::LastError());
+ s->SetFlag(SF_DEAD);
+ return;
+ }
+ else
{
SocketEngine::MarkWritable(s);
+ s->SetFlag(SF_CONNECTING);
return;
}
-
- s->ProcessError();
}
-
- IO->connected = 1;
+ else
+ {
+ s->SetFlag(SF_CONNECTING);
+ this->FinishConnect(s);
+ }
}
-int SSLSocketIO::Connected(ConnectionSocket *s)
+SocketFlag SSLSocketIO::FinishConnect(ConnectionSocket *s)
{
if (s->IO == &normalSocketIO)
- throw SocketException("Connected() called for non ssl socket?");
-
- int i = SocketIO::Connected(s);
- if (i != 1)
- return i;
+ throw SocketException("Attempting to finish connect uninitialized socket with SSL");
+ else if (s->HasFlag(SF_CONNECTED))
+ return SF_CONNECTED;
+ else if (!s->HasFlag(SF_CONNECTING))
+ throw SocketException("SSLSocketIO::FinishConnect called for a socket not connected nor connecting?");
SSLSocketIO *IO = debug_cast<SSLSocketIO *>(s->IO);
- if (IO->connected == 0)
+ if (IO->sslsock == NULL)
{
- int ret = SSL_connect(IO->sslsock);
- if (ret <= 0)
- {
- int error = SSL_get_error(IO->sslsock, ret);
- if (ret == -1 && (error == SSL_ERROR_WANT_READ || error == SSL_ERROR_WANT_WRITE))
- return 0;
+ IO->sslsock = SSL_new(client_ctx);
+ if (!IO->sslsock)
+ throw SocketException("Unable to initialize SSL socket");
- s->ProcessError();
- return -1;
- }
- IO->connected = 1;
- return 0; // poll for next read/write (which will be real), don't assume ones available
+ if (!SSL_set_fd(IO->sslsock, s->GetFD()))
+ throw SocketException("Unable to set SSL fd");
}
- return IO->connected;
+ int ret = SSL_connect(IO->sslsock);
+ if (ret <= 0)
+ {
+ int error = SSL_get_error(IO->sslsock, ret);
+ if (ret == -1 && (error == SSL_ERROR_WANT_READ || error == SSL_ERROR_WANT_WRITE))
+ {
+ SocketEngine::MarkWritable(s);
+ return SF_CONNECTING;
+ }
+ else
+ {
+ s->OnError(ERR_error_string(ERR_get_error(), NULL));
+ s->UnsetFlag(SF_CONNECTING);
+ s->SetFlag(SF_DEAD);
+ return SF_DEAD;
+ }
+ }
+ else
+ {
+ s->UnsetFlag(SF_CONNECTING);
+ s->SetFlag(SF_CONNECTED);
+ s->OnConnect();
+ return SF_CONNECTED;
+ }
}
void SSLSocketIO::Destroy()
diff --git a/modules/extra/m_xmlrpc.cpp b/modules/extra/m_xmlrpc.cpp
index 9d301d684..38a733c7c 100644
--- a/modules/extra/m_xmlrpc.cpp
+++ b/modules/extra/m_xmlrpc.cpp
@@ -9,7 +9,7 @@ class MyXMLRPCClientSocket : public XMLRPCClientSocket
/* Used to skip the (optional) HTTP header, which we really don't care about */
bool in_query;
public:
- MyXMLRPCClientSocket(XMLRPCListenSocket *ls, int fd, const sockaddrs &addr) : XMLRPCClientSocket(ls, fd, addr), in_query(false)
+ MyXMLRPCClientSocket(XMLRPCListenSocket *ls, int fd, const sockaddrs &addr) : Socket(fd, ls->IsIPv6()), XMLRPCClientSocket(ls, addr), in_query(false)
{
}
@@ -245,9 +245,9 @@ class ModuleXMLRPC : public Module
Socket *s = it->second;
++it;
- if (s->Type == SOCKTYPE_CLIENT)
+ ClientSocket *cs = dynamic_cast<ClientSocket *>(s);
+ if (cs != NULL)
{
- ClientSocket *cs = debug_cast<ClientSocket *>(s);
for (unsigned i = 0; i < listen_sockets.size(); ++i)
if (cs->LS == listen_sockets[i])
{
diff --git a/modules/extra/xmlrpc.h b/modules/extra/xmlrpc.h
index 38f548c4f..364617ecc 100644
--- a/modules/extra/xmlrpc.h
+++ b/modules/extra/xmlrpc.h
@@ -3,13 +3,13 @@ class XMLRPCClientSocket;
class XMLRPCListenSocket;
class XMLRPCServiceInterface;
-class XMLRPCClientSocket : public ClientSocket
+class XMLRPCClientSocket : public ClientSocket, public BufferedSocket
{
protected:
Anope::string query;
public:
- XMLRPCClientSocket(ListenSocket *ls, int fd, const sockaddrs &addr) : ClientSocket(ls, fd, addr) { }
+ XMLRPCClientSocket(ListenSocket *ls, const sockaddrs &addr) : ClientSocket(ls, addr), BufferedSocket() { }
virtual ~XMLRPCClientSocket() { }
diff --git a/src/main.cpp b/src/main.cpp
index 3730c60ea..b00a6ee6e 100644
--- a/src/main.cpp
+++ b/src/main.cpp
@@ -81,7 +81,7 @@ class UpdateTimer : public Timer
}
};
-ConnectionSocket *UplinkSock = NULL;
+UplinkSocket *UplinkSock = NULL;
int CurrentUplink = 0;
static void Connect();
@@ -105,77 +105,72 @@ class ReconnectTimer : public Timer
}
};
-class UplinkSocket : public ConnectionSocket
+UplinkSocket::UplinkSocket() : Socket(-1, Config->Uplinks[CurrentUplink]->ipv6), ConnectionSocket(), BufferedSocket()
{
- public:
- UplinkSocket() : ConnectionSocket(Config->Uplinks[CurrentUplink]->ipv6)
- {
- UplinkSock = this;
- }
+ UplinkSock = this;
+}
- ~UplinkSocket()
+UplinkSocket::~UplinkSocket()
+{
+ if (ircdproto && Me && !Me->GetLinks().empty() && Me->GetLinks()[0]->IsSynced())
{
- if (ircdproto && Me && !Me->GetLinks().empty() && Me->GetLinks()[0]->IsSynced())
+ FOREACH_MOD(I_OnServerDisconnect, OnServerDisconnect());
+
+ for (Anope::insensitive_map<User *>::const_iterator it = UserListByNick.begin(); it != UserListByNick.end(); ++it)
{
- FOREACH_MOD(I_OnServerDisconnect, OnServerDisconnect());
+ User *u = it->second;
- for (Anope::insensitive_map<User *>::const_iterator it = UserListByNick.begin(); it != UserListByNick.end(); ++it)
+ if (u->server == Me)
{
- User *u = it->second;
-
- if (u->server == Me)
- {
- /* Don't use quitmsg here, it may contain information you don't want people to see */
- ircdproto->SendQuit(u, "Shutting down");
- BotInfo *bi = findbot(u->nick);
- if (bi != NULL)
- bi->introduced = false;
- }
+ /* Don't use quitmsg here, it may contain information you don't want people to see */
+ ircdproto->SendQuit(u, "Shutting down");
+ BotInfo *bi = findbot(u->nick);
+ if (bi != NULL)
+ bi->introduced = false;
}
-
- ircdproto->SendSquit(Config->ServerName, quitmsg);
-
- this->ProcessWrite(); // Write out the last bit
}
- for (unsigned i = Me->GetLinks().size(); i > 0; --i)
- if (!Me->GetLinks()[i - 1]->HasFlag(SERVER_JUPED))
- Me->GetLinks()[i - 1]->Delete(Me->GetName() + " " + Me->GetLinks()[i - 1]->GetName());
+ ircdproto->SendSquit(Config->ServerName, quitmsg);
- UplinkSock = NULL;
+ this->ProcessWrite(); // Write out the last bit
+ }
- Me->SetFlag(SERVER_SYNCING);
+ for (unsigned i = Me->GetLinks().size(); i > 0; --i)
+ if (!Me->GetLinks()[i - 1]->HasFlag(SERVER_JUPED))
+ Me->GetLinks()[i - 1]->Delete(Me->GetName() + " " + Me->GetLinks()[i - 1]->GetName());
- if (!quitting)
- {
- int Retry = Config->RetryWait;
- if (Retry <= 0)
- Retry = 60;
+ UplinkSock = NULL;
- Log() << "Retrying in " << Retry << " seconds";
- new ReconnectTimer(Retry);
- }
- }
+ Me->SetFlag(SERVER_SYNCING);
- bool Read(const Anope::string &buf)
+ if (!quitting)
{
- process(buf);
- return true;
- }
+ int Retry = Config->RetryWait;
+ if (Retry <= 0)
+ Retry = 60;
- void OnConnect()
- {
- Log() << "Successfully connected to " << Config->Uplinks[CurrentUplink]->host << ":" << Config->Uplinks[CurrentUplink]->port;
- ircdproto->SendConnect();
- FOREACH_MOD(I_OnServerConnect, OnServerConnect());
+ Log() << "Retrying in " << Retry << " seconds";
+ new ReconnectTimer(Retry);
}
+}
- void OnError(const Anope::string &error)
- {
- Log() << "Unable to connect to server " << Config->Uplinks[CurrentUplink]->host << ":" << Config->Uplinks[CurrentUplink]->port << (!error.empty() ? (": " + error) : "");
- this->SetFlag(SF_DEAD);
- }
-};
+bool UplinkSocket::Read(const Anope::string &buf)
+{
+ process(buf);
+ return true;
+}
+
+void UplinkSocket::OnConnect()
+{
+ Log() << "Successfully connected to " << Config->Uplinks[CurrentUplink]->host << ":" << Config->Uplinks[CurrentUplink]->port;
+ ircdproto->SendConnect();
+ FOREACH_MOD(I_OnServerConnect, OnServerConnect());
+}
+
+void UplinkSocket::OnError(const Anope::string &error)
+{
+ Log() << "Unable to connect to server " << Config->Uplinks[CurrentUplink]->host << ":" << Config->Uplinks[CurrentUplink]->port << (!error.empty() ? (": " + error) : "");
+}
static void Connect()
{
diff --git a/src/socket_clients.cpp b/src/socket_clients.cpp
new file mode 100644
index 000000000..4a43629a4
--- /dev/null
+++ b/src/socket_clients.cpp
@@ -0,0 +1,96 @@
+/*
+ *
+ * (C) 2003-2011 Anope Team
+ * Contact us at team@anope.org
+ *
+ * Please read COPYING and README for further details.
+ *
+ * Based on the original code of Epona by Lara.
+ * Based on the original code of Services by Andy Church.
+ */
+
+#include "services.h"
+
+ConnectionSocket::ConnectionSocket() : Socket()
+{
+}
+
+void ConnectionSocket::Connect(const Anope::string &TargetHost, int Port)
+{
+ this->IO->Connect(this, TargetHost, Port);
+}
+
+bool ConnectionSocket::Process()
+{
+ try
+ {
+ if (this->HasFlag(SF_CONNECTED))
+ return true;
+ else if (this->HasFlag(SF_CONNECTING))
+ this->SetFlag(this->IO->FinishConnect(this));
+ else
+ this->SetFlag(SF_DEAD);
+ }
+ catch (const SocketException &ex)
+ {
+ Log() << ex.GetReason();
+ }
+ return false;
+}
+
+void ConnectionSocket::ProcessError()
+{
+ int optval = 0;
+ socklen_t optlen = sizeof(optval);
+ getsockopt(this->GetFD(), SOL_SOCKET, SO_ERROR, reinterpret_cast<char *>(&optval), &optlen);
+ errno = optval;
+ this->OnError(optval ? Anope::LastError() : "");
+}
+
+void ConnectionSocket::OnConnect()
+{
+}
+
+void ConnectionSocket::OnError(const Anope::string &)
+{
+}
+
+ClientSocket::ClientSocket(ListenSocket *ls, const sockaddrs &addr) : Socket(), LS(ls), clientaddr(addr)
+{
+}
+
+bool ClientSocket::Process()
+{
+ try
+ {
+ if (this->HasFlag(SF_ACCEPTED))
+ return true;
+ else if (this->HasFlag(SF_ACCEPTING))
+ this->SetFlag(this->IO->FinishAccept(this));
+ else
+ this->SetFlag(SF_DEAD);
+ }
+ catch (const SocketException &ex)
+ {
+ Log() << ex.GetReason();
+ }
+ return false;
+}
+
+void ClientSocket::ProcessError()
+{
+ int optval = 0;
+ socklen_t optlen = sizeof(optval);
+ getsockopt(this->GetFD(), SOL_SOCKET, SO_ERROR, reinterpret_cast<char *>(&optval), &optlen);
+ errno = optval;
+ this->OnError(optval ? Anope::LastError() : "");
+}
+
+void ClientSocket::OnAccept()
+{
+}
+
+void ClientSocket::OnError(const Anope::string &error)
+{
+}
+
diff --git a/src/socket_transport.cpp b/src/socket_transport.cpp
new file mode 100644
index 000000000..a7b92ff20
--- /dev/null
+++ b/src/socket_transport.cpp
@@ -0,0 +1,185 @@
+/*
+ *
+ * (C) 2003-2011 Anope Team
+ * Contact us at team@anope.org
+ *
+ * Please read COPYING and README for further details.
+ *
+ * Based on the original code of Epona by Lara.
+ * Based on the original code of Services by Andy Church.
+ */
+
+#include "services.h"
+
+BufferedSocket::BufferedSocket()
+{
+}
+
+BufferedSocket::~BufferedSocket()
+{
+}
+
+bool BufferedSocket::ProcessRead()
+{
+ char tbuffer[NET_BUFSIZE];
+
+ this->RecvLen = 0;
+
+ int len = this->IO->Recv(this, tbuffer, sizeof(tbuffer) - 1);
+ if (len <= 0)
+ return false;
+
+ tbuffer[len] = 0;
+ this->RecvLen = len;
+
+ Anope::string sbuffer = this->extrabuf;
+ sbuffer += tbuffer;
+ this->extrabuf.clear();
+ size_t lastnewline = sbuffer.rfind('\n');
+ if (lastnewline == Anope::string::npos)
+ {
+ this->extrabuf = sbuffer;
+ return true;
+ }
+ if (lastnewline < sbuffer.length() - 1)
+ {
+ this->extrabuf = sbuffer.substr(lastnewline);
+ this->extrabuf.trim();
+ sbuffer = sbuffer.substr(0, lastnewline);
+ }
+
+ sepstream stream(sbuffer, '\n');
+
+ Anope::string tbuf;
+ while (stream.GetToken(tbuf))
+ {
+ tbuf.trim();
+ if (!tbuf.empty() && !Read(tbuf))
+ return false;
+ }
+
+ return true;
+}
+
+bool BufferedSocket::ProcessWrite()
+{
+ int count = this->IO->Send(this, this->WriteBuffer);
+ if (count <= -1)
+ return false;
+ this->WriteBuffer = this->WriteBuffer.substr(count);
+ if (this->WriteBuffer.empty())
+ SocketEngine::ClearWritable(this);
+
+ return true;
+}
+
+bool BufferedSocket::Read(const Anope::string &buf)
+{
+ return false;
+}
+
+void BufferedSocket::Write(const char *message, ...)
+{
+ va_list vi;
+ char tbuffer[BUFSIZE];
+
+ if (!message)
+ return;
+
+ va_start(vi, message);
+ vsnprintf(tbuffer, sizeof(tbuffer), message, vi);
+ va_end(vi);
+
+ Anope::string sbuf = tbuffer;
+ Write(sbuf);
+}
+
+void BufferedSocket::Write(const Anope::string &message)
+{
+ this->WriteBuffer += message + "\r\n";
+ SocketEngine::MarkWritable(this);
+}
+
+int BufferedSocket::ReadBufferLen() const
+{
+ return RecvLen;
+}
+
+int BufferedSocket::WriteBufferLen() const
+{
+ return this->WriteBuffer.length();
+}
+
+
+BinarySocket::DataBlock::DataBlock(const char *b, size_t l)
+{
+ this->buf = new char[l];
+ memcpy(this->buf, b, l);
+ this->len = l;
+}
+
+BinarySocket::DataBlock::~DataBlock()
+{
+ delete [] this->buf;
+}
+
+BinarySocket::BinarySocket()
+{
+}
+
+BinarySocket::~BinarySocket()
+{
+}
+
+bool BinarySocket::ProcessRead()
+{
+ char tbuffer[NET_BUFSIZE];
+
+ int len = this->IO->Recv(this, tbuffer, sizeof(tbuffer));
+ if (len <= 0)
+ return false;
+
+ return this->Read(tbuffer, len);
+}
+
+bool BinarySocket::ProcessWrite()
+{
+ if (this->WriteBuffer.empty())
+ {
+ SocketEngine::ClearWritable(this);
+ return true;
+ }
+
+ DataBlock *d = this->WriteBuffer.front();
+
+ int len = this->IO->Send(this, d->buf, d->len);
+ if (len <= -1)
+ return false;
+ else if (static_cast<size_t>(len) == d->len)
+ {
+ delete d;
+ this->WriteBuffer.pop_front();
+ }
+ else
+ {
+ d->buf += len;
+ d->len -= len;
+ }
+
+ if (this->WriteBuffer.empty())
+ SocketEngine::ClearWritable(this);
+
+ return true;
+}
+
+void BinarySocket::Write(const char *buffer, size_t l)
+{
+ this->WriteBuffer.push_back(new DataBlock(buffer, l));
+ SocketEngine::MarkWritable(this);
+}
+
+bool BinarySocket::Read(const char *buffer, size_t l)
+{
+ return true;
+}
+
diff --git a/src/socketengines/pipeengine_eventfd.cpp b/src/socketengines/pipeengine_eventfd.cpp
index aa712de3d..21931bb1b 100644
--- a/src/socketengines/pipeengine_eventfd.cpp
+++ b/src/socketengines/pipeengine_eventfd.cpp
@@ -1,64 +1,29 @@
#include "services.h"
#include <sys/eventfd.h>
-class PipeIO : public SocketIO
+Pipe::Pipe() : Socket(eventfd(0, EFD_NONBLOCK))
{
- public:
- /** Receive something from the buffer
- * @param s The socket
- * @param buf The buf to read to
- * @param sz How much to read
- * @return Number of bytes received
- */
- int Recv(Socket *s, char *buf, size_t sz)
- {
- static eventfd_t dummy;
- return !eventfd_read(s->GetFD(), &dummy);
- }
-
- /** Write something to the socket
- * @param s The socket
- * @param buf What to write
- * @return Number of bytes written
- */
- int Send(Socket *s, const Anope::string &buf)
- {
- return !eventfd_write(s->GetFD(), 1);
- }
-} pipeSocketIO;
-
-Pipe::Pipe() : BufferedSocket()
-{
- this->IO = &pipeSocketIO;
- this->Sock = eventfd(0, EFD_NONBLOCK);
if (this->Sock < 0)
- throw CoreException(Anope::string("Could not create pipe: ") + Anope::LastError());
-
- this->IPv6 = false;
+ throw CoreException("Could not create pipe: " + Anope::LastError());
SocketEngine::AddSocket(this);
}
-bool Pipe::ProcessRead()
+Pipe::~Pipe()
{
- this->IO->Recv(this, NULL, 0);
- return this->Read("");
}
-bool Pipe::Read(const Anope::string &)
+bool Pipe::ProcessRead()
{
+ eventfd_t dummy;
+ eventfd_read(this->GetFD(), &dummy);
this->OnNotify();
return true;
}
void Pipe::Notify()
{
- /* Note we send this immediatly. If use use Socket::Write and if this functions is called
- * from a thread, only epoll is able to pick up the change to this sockets want flags immediately
- * Other engines time out then pick up and write the change then read it back, which
- * is too slow for most things.
- */
- this->IO->Send(this, "");
+ eventfd_write(this->GetFD(), 1);
}
void Pipe::OnNotify()
diff --git a/src/socketengines/pipeengine_pipe.cpp b/src/socketengines/pipeengine_pipe.cpp
index b3ca15d87..ceaded8ea 100644
--- a/src/socketengines/pipeengine_pipe.cpp
+++ b/src/socketengines/pipeengine_pipe.cpp
@@ -1,67 +1,40 @@
#include "services.h"
-class PipeIO : public SocketIO
-{
- public:
- /** Receive something from the buffer
- * @param s The socket
- * @param buf The buf to read to
- * @param sz How much to read
- * @return Number of bytes received
- */
- int Recv(Socket *s, char *buf, size_t sz)
- {
- static char dummy[512];
- while (read(s->GetFD(), &dummy, 512) == 512);
- return 0;
- }
-
- /** Write something to the socket
- * @param s The socket
- * @param buf What to write
- * @return Number of bytes written
- */
- int Send(Socket *s, const Anope::string &buf)
- {
- static const char dummy = '*';
- Pipe *pipe = debug_cast<Pipe *>(s);
- return write(pipe->WritePipe, &dummy, 1);
- }
-} pipeSocketIO;
-
-Pipe::Pipe() : BufferedSocket()
+Pipe::Pipe() : Socket(-1)
{
int fds[2];
if (pipe(fds))
- throw CoreException(Anope::string("Could not create pipe: ") + Anope::LastError());
+ throw CoreException("Could not create pipe: " + Anope::LastError());
int flags = fcntl(fds[0], F_GETFL, 0);
fcntl(fds[0], F_SETFL, flags | O_NONBLOCK);
flags = fcntl(fds[1], F_GETFL, 0);
fcntl(fds[1], F_SETFL, flags | O_NONBLOCK);
- this->IO = &pipeSocketIO;
+ this->~Socket();
+
this->Sock = fds[0];
this->WritePipe = fds[1];
- this->IPv6 = false;
SocketEngine::AddSocket(this);
}
-bool Pipe::ProcessRead()
+Pipe::~Pipe()
{
- this->IO->Recv(this, NULL, 0);
- return this->Read("");
+ CloseSocket(this->WritePipe);
}
-bool Pipe::Read(const Anope::string &)
+bool Pipe::ProcessRead()
{
+ char dummy[512];
+ while (read(this->GetFD(), &dummy, 512) == 512);
this->OnNotify();
return true;
}
void Pipe::Notify()
{
- this->IO->Send(this, "");
+ const char dummy = '*';
+ write(this->WritePipe, &dummy, 1);
}
void Pipe::OnNotify()
diff --git a/src/socketengines/pipeengine_win32.cpp b/src/socketengines/pipeengine_win32.cpp
index 3bd18d0cd..f34472870 100644
--- a/src/socketengines/pipeengine_win32.cpp
+++ b/src/socketengines/pipeengine_win32.cpp
@@ -1,89 +1,50 @@
#include "services.h"
-static ClientSocket *newsocket = NULL;
-
-class LSocket : public ListenSocket
-{
- public:
- LSocket(const Anope::string &host, int port) : ListenSocket(host, port, false) { }
-
- ClientSocket *OnAccept(int fd, const sockaddrs &addr)
- {
- newsocket = new ClientSocket(this, fd, addr);
- return newsocket;
- }
-};
-
-class PipeIO : public SocketIO
+Pipe::Pipe() : Socket(-1)
{
- public:
- /** Receive something from the buffer
- * @param s The socket
- * @param buf The buf to read to
- * @param sz How much to read
- * @return Number of bytes received
- */
- int Recv(Socket *s, char *buf, size_t sz)
- {
- static char dummy[512];
- return recv(s->GetFD(), dummy, 512, 0);
- }
-
- /** Write something to the socket
- * @param s The socket
- * @param buf What to write
- * @return Number of bytes written
- */
- int Send(Socket *s, const Anope::string &buf)
- {
- static const char dummy = '*';
- Pipe *pipe = debug_cast<Pipe *>(s);
- return send(pipe->WritePipe, &dummy, 1, 0);
- }
-} pipeSocketIO;
+ sockaddrs localhost;
-Pipe::Pipe() : BufferedSocket()
-{
- LSocket lfs("127.0.0.1", 0);
+ localhost.pton(AF_INET, "127.0.0.1");
- int cfd = socket(AF_INET, SOCK_STREAM, 0);
+ int cfd = socket(AF_INET, SOCK_STREAM, 0), lfd = socket(AF_INET, SOCK_STREAM, 0);
if (cfd == -1)
throw CoreException("Error accepting new socket for Pipe");
+
+ if (bind(lfd, &localhost.sa, localhost.size()) == -1)
+ throw CoreException("Error accepting new socket for Pipe");
+ if (listen(lfd, 1) == -1)
+ throw CoreException("Error accepting new socket for Pipe");
- sockaddr_in addr;
- socklen_t sz = sizeof(addr);
- getsockname(lfs.GetFD(), reinterpret_cast<sockaddr *>(&addr), &sz);
+ sockaddrs lfd_addr;
+ socklen_t sz = sizeof(lfd_addr);
+ getsockname(lfd, &lfd_addr.sa, &sz);
- if (connect(cfd, reinterpret_cast<sockaddr *>(&addr), sz))
- throw CoreException("Error accepting new socket for Pipe");
- lfs.ProcessRead();
- if (!newsocket)
+ if (connect(cfd, &lfd_addr.sa, lfd_addr.size()))
throw CoreException("Error accepting new socket for Pipe");
+ CloseSocket(lfd);
- this->IO = &pipeSocketIO;
- this->Sock = cfd;
- this->WritePipe = newsocket->GetFD();
- this->IPv6 = false;
+ this->WritePipe = cfd;
SocketEngine::AddSocket(this);
- newsocket = NULL;
}
-bool Pipe::ProcessRead()
+Pipe::~Pipe()
{
- this->IO->Recv(this, NULL, 0);
- return this->Read("");
+ CloseSocket(this->WritePipe);
}
-bool Pipe::Read(const Anope::string &)
+bool Pipe::ProcessRead()
{
+ char dummy[512];
+ while (recv(this->GetFD(), dummy, 512, 0) == 512);
this->OnNotify();
return true;
}
void Pipe::Notify()
{
- this->IO->Send(this, "");
+ const char dummy = '*';
+ send(this->WritePipe, &dummy, 1, 0);
}
void Pipe::OnNotify()
diff --git a/src/socketengines/socketengine_epoll.cpp b/src/socketengines/socketengine_epoll.cpp
index ad9bef3ce..73f7e10d6 100644
--- a/src/socketengines/socketengine_epoll.cpp
+++ b/src/socketengines/socketengine_epoll.cpp
@@ -11,18 +11,12 @@ void SocketEngine::Init()
max = ulimit(4, 0);
if (max <= 0)
- {
- Log() << "Can't determine maximum number of open sockets";
- throw CoreException("Can't determine maximum number of open sockets");
- }
+ throw SocketException("Can't determine maximum number of open sockets");
EngineHandle = epoll_create(max / 4);
if (EngineHandle == -1)
- {
- Log() << "Could not initialize epoll socket engine: " << Anope::LastError();
- throw CoreException(Anope::string("Could not initialize epoll socket engine: ") + Anope::LastError());
- }
+ throw SocketException("Could not initialize epoll socket engine: " + Anope::LastError());
events = new epoll_event[max];
memset(events, 0, sizeof(epoll_event) * max);
@@ -53,10 +47,7 @@ void SocketEngine::AddSocket(Socket *s)
ev.data.fd = s->GetFD();
if (epoll_ctl(EngineHandle, EPOLL_CTL_ADD, ev.data.fd, &ev) == -1)
- {
- Log() << "Unable to add fd " << ev.data.fd << " to socketengine epoll: " << Anope::LastError();
- return;
- }
+ throw SocketException("Unable to add fd " + stringify(ev.data.fd) + " to epoll: " + Anope::LastError());
Sockets.insert(std::make_pair(ev.data.fd, s));
}
@@ -70,10 +61,7 @@ void SocketEngine::DelSocket(Socket *s)
ev.data.fd = s->GetFD();
if (epoll_ctl(EngineHandle, EPOLL_CTL_DEL, ev.data.fd, &ev) == -1)
- {
- Log() << "Unable to delete fd " << ev.data.fd << " from socketengine epoll: " << Anope::LastError();
- return;
- }
+ throw SocketException("Unable to remove fd " + stringify(ev.data.fd) + " from epoll: " + Anope::LastError());
Sockets.erase(ev.data.fd);
}
@@ -91,9 +79,9 @@ void SocketEngine::MarkWritable(Socket *s)
ev.data.fd = s->GetFD();
if (epoll_ctl(EngineHandle, EPOLL_CTL_MOD, ev.data.fd, &ev) == -1)
- Log() << "Unable to mark fd " << ev.data.fd << " as writable in socketengine epoll: " << Anope::LastError();
- else
- s->SetFlag(SF_WRITABLE);
+ throw SocketException("Unable to mark fd " + stringify(ev.data.fd) + " as writable in epoll: " + Anope::LastError());
+
+ s->SetFlag(SF_WRITABLE);
}
void SocketEngine::ClearWritable(Socket *s)
@@ -109,9 +97,9 @@ void SocketEngine::ClearWritable(Socket *s)
ev.data.fd = s->GetFD();
if (epoll_ctl(EngineHandle, EPOLL_CTL_MOD, ev.data.fd, &ev) == -1)
- Log() << "Unable to mark fd " << ev.data.fd << " as unwritable in socketengine epoll: " << Anope::LastError();
- else
- s->UnsetFlag(SF_WRITABLE);
+ throw SocketException("Unable clear mark fd " + stringify(ev.data.fd) + " as writable in epoll: " + Anope::LastError());
+
+ s->UnsetFlag(SF_WRITABLE);
}
void SocketEngine::Process()
@@ -130,10 +118,15 @@ void SocketEngine::Process()
for (int i = 0; i < total; ++i)
{
epoll_event *ev = &events[i];
- Socket *s = Sockets[ev->data.fd];
+
+ std::map<int, Socket *>::iterator it = Sockets.find(ev->data.fd);
+ if (it == Sockets.end())
+ continue;
+ Socket *s = it->second;
if (s->HasFlag(SF_DEAD))
continue;
+
if (ev->events & (EPOLLHUP | EPOLLERR))
{
s->ProcessError();
@@ -141,6 +134,9 @@ void SocketEngine::Process()
continue;
}
+ if (!s->Process())
+ continue;
+
if ((ev->events & EPOLLIN) && !s->ProcessRead())
s->SetFlag(SF_DEAD);
@@ -151,7 +147,10 @@ void SocketEngine::Process()
for (int i = 0; i < total; ++i)
{
epoll_event *ev = &events[i];
- Socket *s = Sockets[ev->data.fd];
+ std::map<int, Socket *>::iterator it = Sockets.find(ev->data.fd);
+ if (it == Sockets.end())
+ continue;
+ Socket *s = it->second;
if (s->HasFlag(SF_DEAD))
delete s;
diff --git a/src/socketengines/socketengine_poll.cpp b/src/socketengines/socketengine_poll.cpp
index 278485fa7..ba111659a 100644
--- a/src/socketengines/socketengine_poll.cpp
+++ b/src/socketengines/socketengine_poll.cpp
@@ -50,10 +50,7 @@ void SocketEngine::Shutdown()
void SocketEngine::AddSocket(Socket *s)
{
if (SocketCount == max)
- {
- Log() << "Unable to add fd " << s->GetFD() << " to socketengine poll, engine is full";
- return;
- }
+ throw SocketException("Unable to add fd " + stringify(s->GetFD()) + " to poll: " + Anope::LastError());
pollfd *ev = &events[SocketCount];
ev->fd = s->GetFD();
@@ -70,10 +67,7 @@ void SocketEngine::DelSocket(Socket *s)
{
std::map<int, int>::iterator pos = socket_positions.find(s->GetFD());
if (pos == socket_positions.end())
- {
- Log() << "Unable to delete unknown fd " << s->GetFD() << " from socketengine poll";
- return;
- }
+ throw SocketException("Unable to remove fd " + stringify(s->GetFD()) + " from poll");
if (pos->second != SocketCount - 1)
{
@@ -100,10 +94,7 @@ void SocketEngine::MarkWritable(Socket *s)
std::map<int, int>::iterator pos = socket_positions.find(s->GetFD());
if (pos == socket_positions.end())
- {
- Log() << "Unable to mark unknown fd " << s->GetFD() << " as writable";
- return;
- }
+ throw SocketException("Unable to mark fd " + stringify(s->GetFD()) + " as writable in poll");
pollfd *ev = &events[pos->second];
ev->events |= POLLOUT;
@@ -118,10 +109,7 @@ void SocketEngine::ClearWritable(Socket *s)
std::map<int, int>::iterator pos = socket_positions.find(s->GetFD());
if (pos == socket_positions.end())
- {
- Log() << "Unable to mark unknown fd " << s->GetFD() << " as writable";
- return;
- }
+ throw SocketException("Unable clear mark fd " + stringify(s->GetFD()) + " as writable in poll");
pollfd *ev = &events[pos->second];
ev->events &= ~POLLOUT;
@@ -149,10 +137,14 @@ void SocketEngine::Process()
if (ev->revents != 0)
++processed;
- Socket *s = Sockets[ev->fd];
+ std::map<int, Socket *>::iterator it = Sockets.find(ev->fd);
+ if (it == Sockets.end())
+ continue;
+ Socket *s = it->second;
if (s->HasFlag(SF_DEAD))
continue;
+
if (ev->revents & (POLLERR | POLLRDHUP))
{
s->ProcessError();
@@ -160,6 +152,9 @@ void SocketEngine::Process()
continue;
}
+ if (!s->Process())
+ continue;
+
if ((ev->revents & POLLIN) && !s->ProcessRead())
s->SetFlag(SF_DEAD);
@@ -170,7 +165,10 @@ void SocketEngine::Process()
for (int i = 0; i < SocketCount; ++i)
{
pollfd *ev = &events[i];
- Socket *s = Sockets[ev->fd];
+ std::map<int, Socket *>::iterator it = Sockets.find(ev->fd);
+ if (it == Sockets.end())
+ continue;
+ Socket *s = it->second;
if (s->HasFlag(SF_DEAD))
delete s;
diff --git a/src/socketengines/socketengine_select.cpp b/src/socketengines/socketengine_select.cpp
index 34a86f680..ad6e04ca9 100644
--- a/src/socketengines/socketengine_select.cpp
+++ b/src/socketengines/socketengine_select.cpp
@@ -96,19 +96,27 @@ void SocketEngine::Process()
{
Socket *s = it->second;
- if (FD_ISSET(s->GetFD(), &efdset) || FD_ISSET(s->GetFD(), &rfdset) || FD_ISSET(s->GetFD(), &wfdset))
+ bool has_read = FD_ISSET(s->GetFD(), &rfdset), has_write = FD_ISSET(s->GetFD(), &wfdset), has_error = FD_ISSET(s->GetFD(), &efdset);
+ if (has_read || has_write || has_error)
++processed;
+
if (s->HasFlag(SF_DEAD))
continue;
- if (FD_ISSET(s->GetFD(), &efdset))
+
+ if (has_error)
{
s->ProcessError();
s->SetFlag(SF_DEAD);
continue;
}
- if (FD_ISSET(s->GetFD(), &rfdset) && !s->ProcessRead())
+
+ if (!s->Process())
+ continue;
+
+ if (has_read && !s->ProcessRead())
s->SetFlag(SF_DEAD);
- if (FD_ISSET(s->GetFD(), &wfdset) && !s->ProcessWrite())
+
+ if (has_write && !s->ProcessWrite())
s->SetFlag(SF_DEAD);
}
diff --git a/src/sockets.cpp b/src/sockets.cpp
index b7f0a5f9a..541e3bb3a 100644
--- a/src/sockets.cpp
+++ b/src/sockets.cpp
@@ -260,15 +260,19 @@ int SocketIO::Recv(Socket *s, char *buf, size_t sz)
/** Write something to the socket
* @param s The socket
- * @param buf What to write
- * @return Number of bytes written
+ * @param buf The data to write
+ * @param size The length of the data
*/
-int SocketIO::Send(Socket *s, const Anope::string &buf)
+int SocketIO::Send(Socket *s, const char *buf, size_t sz)
{
- size_t i = send(s->GetFD(), buf.c_str(), buf.length(), 0);
+ size_t i = send(s->GetFD(), buf, sz, 0);
TotalWritten += i;
return i;
}
+int SocketIO::Send(Socket *s, const Anope::string &buf)
+{
+ return this->Send(s, buf.c_str(), buf.length());
+}
/** Accept a connection from a socket
* @param s The socket
@@ -282,22 +286,27 @@ ClientSocket *SocketIO::Accept(ListenSocket *s)
int newsock = accept(s->GetFD(), &conaddr.sa, &size);
#ifndef INVALID_SOCKET
-# define INVALID_SOCKET -1
+ static const int INVALID_SOCKET = -1;
#endif
if (newsock >= 0 && newsock != INVALID_SOCKET)
- return s->OnAccept(newsock, conaddr);
+ {
+ ClientSocket *ns = s->OnAccept(newsock, conaddr);
+ ns->SetFlag(SF_ACCEPTED);
+ ns->OnAccept();
+ return ns;
+ }
else
throw SocketException("Unable to accept connection: " + Anope::LastError());
}
-/** Check if a connection has been accepted
- * @param s The client socket
- * @return -1 on error, 0 to wait, 1 on success
+/** Finished accepting a connection from a socket
+ * @param s The socket
+ * @return SF_ACCEPTED if accepted, SF_ACCEPTING if still in process, SF_DEAD on error
*/
-int SocketIO::Accepted(ClientSocket *cs)
+SocketFlag SocketIO::FinishAccept(ClientSocket *cs)
{
- return 1;
+ return SF_ACCEPTED;
}
/** Bind a socket
@@ -319,6 +328,8 @@ void SocketIO::Bind(Socket *s, const Anope::string &ip, int port)
*/
void SocketIO::Connect(ConnectionSocket *s, const Anope::string &target, int port)
{
+ s->UnsetFlag(SF_CONNECTING);
+ s->UnsetFlag(SF_CONNECTED);
s->conaddr.pton(s->IsIPv6() ? AF_INET6 : AF_INET, target, port);
int c = connect(s->GetFD(), &s->conaddr.sa, s->conaddr.size());
if (c == -1)
@@ -326,30 +337,51 @@ void SocketIO::Connect(ConnectionSocket *s, const Anope::string &target, int por
if (Anope::LastErrorCode() != EINPROGRESS)
s->OnError(Anope::LastError());
else
+ {
SocketEngine::MarkWritable(s);
+ s->SetFlag(SF_CONNECTING);
+ }
}
else
{
- s->connected = true;
+ s->SetFlag(SF_CONNECTED);
s->OnConnect();
}
}
-/** Check if this socket is connected
+/** Called to potentially finish a pending connection
* @param s The socket
- * @return -1 for error, 0 for wait, 1 for connected
+ * @return SF_CONNECTED on success, SF_CONNECTING if still pending, and SF_DEAD on error.
*/
-int SocketIO::Connected(ConnectionSocket *s)
+SocketFlag SocketIO::FinishConnect(ConnectionSocket *s)
{
- return s->connected == true ? 1 : -1;
+ if (s->HasFlag(SF_CONNECTED))
+ return SF_CONNECTED;
+ else if (!s->HasFlag(SF_CONNECTING))
+ throw SocketException("SocketIO::FinishConnect called for a socket not connected nor connecting?");
+
+ int optval = 0;
+ socklen_t optlen = sizeof(optval);
+ if (!getsockopt(s->GetFD(), SOL_SOCKET, SO_ERROR, reinterpret_cast<char *>(&optval), &optlen) && !optval)
+ {
+ s->SetFlag(SF_CONNECTED);
+ s->UnsetFlag(SF_CONNECTING);
+ s->OnConnect();
+ return SF_CONNECTED;
+ }
+ else
+ {
+ errno = optval;
+ s->ProcessError();
+ return SF_DEAD;
+ }
}
-/** Empty constructor, used for things such as the pipe socket
+/** Empty constructor, should not be called.
*/
-Socket::Socket() : Flags<SocketFlag, 2>(SocketFlagStrings)
+Socket::Socket() : Flags<SocketFlag>(SocketFlagStrings)
{
- this->Type = SOCKTYPE_BASE;
- this->IO = &normalSocketIO;
+ throw CoreException("Socket::Socket() ?");
}
/** Constructor
@@ -357,12 +389,11 @@ Socket::Socket() : Flags<SocketFlag, 2>(SocketFlagStrings)
* @param ipv6 IPv6?
* @param type The socket type, defaults to SOCK_STREAM
*/
-Socket::Socket(int sock, bool ipv6, int type) : Flags<SocketFlag, 2>(SocketFlagStrings)
+Socket::Socket(int sock, bool ipv6, int type) : Flags<SocketFlag>(SocketFlagStrings)
{
- this->Type = SOCKTYPE_BASE;
this->IO = &normalSocketIO;
this->IPv6 = ipv6;
- if (sock == 0)
+ if (sock == -1)
this->Sock = socket(this->IPv6 ? AF_INET6 : AF_INET, type, 0);
else
this->Sock = sock;
@@ -432,163 +463,35 @@ void Socket::Bind(const Anope::string &ip, int port)
this->IO->Bind(this, ip, port);
}
-/** Called when there is something to be received for this socket
- * @return true on success, false to drop this socket
+/** Called when there either is a read or write event.
+ * @return true to continue to call ProcessRead/ProcessWrite, false to not continue
*/
-bool Socket::ProcessRead()
+bool Socket::Process()
{
return true;
}
-/** Called when the socket is ready to be written to
- * @return true on success, false to drop this socket
- */
-bool Socket::ProcessWrite()
-{
- return true;
-}
-
-/** Called when there is an error for this socket
- * @return true on success, false to drop this socket
- */
-void Socket::ProcessError()
-{
-}
-
-/** Constructor for pipe socket
- */
-BufferedSocket::BufferedSocket() : Socket()
-{
- this->Type = SOCKTYPE_BUFFERED;
-}
-
-/** Constructor
- * @param fd FD to use
- * @param ipv6 true for ipv6
- * @param type socket type, defaults to SOCK_STREAM
- */
-BufferedSocket::BufferedSocket(int fd, bool ipv6, int type) : Socket(fd, ipv6, type)
-{
- this->Type = SOCKTYPE_BUFFERED;
-}
-
-/** Default destructor
- */
-BufferedSocket::~BufferedSocket()
-{
-}
-
/** Called when there is something to be received for this socket
* @return true on success, false to drop this socket
*/
-bool BufferedSocket::ProcessRead()
+bool Socket::ProcessRead()
{
- char tbuffer[NET_BUFSIZE];
-
- this->RecvLen = 0;
-
- int len = this->IO->Recv(this, tbuffer, sizeof(tbuffer) - 1);
- if (len <= 0)
- return false;
-
- tbuffer[len] = 0;
- this->RecvLen = len;
-
- Anope::string sbuffer = this->extrabuf;
- sbuffer += tbuffer;
- this->extrabuf.clear();
- size_t lastnewline = sbuffer.rfind('\n');
- if (lastnewline == Anope::string::npos)
- {
- this->extrabuf = sbuffer;
- return true;
- }
- if (lastnewline < sbuffer.length() - 1)
- {
- this->extrabuf = sbuffer.substr(lastnewline);
- this->extrabuf.trim();
- sbuffer = sbuffer.substr(0, lastnewline);
- }
-
- sepstream stream(sbuffer, '\n');
-
- Anope::string tbuf;
- while (stream.GetToken(tbuf))
- {
- tbuf.trim();
- if (!tbuf.empty() && !Read(tbuf))
- return false;
- }
-
return true;
}
/** Called when the socket is ready to be written to
* @return true on success, false to drop this socket
*/
-bool BufferedSocket::ProcessWrite()
+bool Socket::ProcessWrite()
{
- int count = this->IO->Send(this, this->WriteBuffer);
- if (count <= -1)
- return false;
- this->WriteBuffer = this->WriteBuffer.substr(count);
- if (this->WriteBuffer.empty())
- SocketEngine::ClearWritable(this);
-
return true;
}
-/** Called with a line received from the socket
- * @param buf The line
- * @return true to continue reading, false to drop the socket
- */
-bool BufferedSocket::Read(const Anope::string &buf)
-{
- return false;
-}
-
-/** Write to the socket
- * @param message The message
- */
-void BufferedSocket::Write(const char *message, ...)
-{
- va_list vi;
- char tbuffer[BUFSIZE];
-
- if (!message)
- return;
-
- va_start(vi, message);
- vsnprintf(tbuffer, sizeof(tbuffer), message, vi);
- va_end(vi);
-
- Anope::string sbuf = tbuffer;
- Write(sbuf);
-}
-
-/** Write to the socket
- * @param message The message
- */
-void BufferedSocket::Write(const Anope::string &message)
-{
- this->WriteBuffer += message + "\r\n";
- SocketEngine::MarkWritable(this);
-}
-
-/** Get the length of the read buffer
- * @return The length of the read buffer
- */
-int BufferedSocket::ReadBufferLen() const
-{
- return RecvLen;
-}
-
-/** Get the length of the write buffer
- * @return The length of the write buffer
+/** Called when there is an error for this socket
+ * @return true on success, false to drop this socket
*/
-int BufferedSocket::WriteBufferLen() const
+void Socket::ProcessError()
{
- return this->WriteBuffer.length();
}
/** Constructor
@@ -596,16 +499,20 @@ int BufferedSocket::WriteBufferLen() const
* @param port The port to listen on
* @param ipv6 true for ipv6
*/
-ListenSocket::ListenSocket(const Anope::string &bindip, int port, bool ipv6) : Socket(0, ipv6)
+ListenSocket::ListenSocket(const Anope::string &bindip, int port, bool ipv6) : Socket(-1, ipv6)
{
- this->Type = SOCKTYPE_LISTEN;
this->SetNonBlocking();
+#ifndef _WIN32
+ int op = 1;
+ setsockopt(this->GetFD(), SOL_SOCKET, SO_REUSEADDR, &op, sizeof(op));
+#endif
+
this->bindaddr.pton(IPv6 ? AF_INET6 : AF_INET, bindip, port);
this->IO->Bind(this, bindip, port);
if (listen(Sock, SOMAXCONN) == -1)
- throw SocketException(Anope::string("Unable to listen: ") + Anope::LastError());
+ throw SocketException("Unable to listen: " + Anope::LastError());
}
/** Destructor
@@ -629,148 +536,3 @@ bool ListenSocket::ProcessRead()
return true;
}
-/** Called when a connection is accepted
- * @param fd The FD for the new connection
- * @param addr The sockaddr for where the connection came from
- * @return The new socket
- */
-ClientSocket *ListenSocket::OnAccept(int fd, const sockaddrs &addr)
-{
- return new ClientSocket(this, fd, addr);
-}
-
-/** Constructor
- * @param ipv6 true to use IPv6
- * @param type The socket type, defaults to SOCK_STREAM
- */
-ConnectionSocket::ConnectionSocket(bool ipv6, int type) : BufferedSocket(0, ipv6, type), connected(false)
-{
- this->Type = SOCKTYPE_CONNECTION;
-}
-
-/** Connect the socket
- * @param TargetHost The target host to connect to
- * @param Port The target port to connect to
- */
-void ConnectionSocket::Connect(const Anope::string &TargetHost, int Port)
-{
- this->IO->Connect(this, TargetHost, Port);
-}
-
-/** Called when there is something to be received for this socket
- * @return true on success, false to drop this socket
- */
-bool ConnectionSocket::ProcessRead()
-{
- if (!this->connected)
- {
- int optval = 0;
- socklen_t optlen = sizeof(optval);
- if (!getsockopt(this->GetFD(), SOL_SOCKET, SO_ERROR, reinterpret_cast<char *>(&optval), &optlen) && !optval)
- {
- this->connected = true;
- this->OnConnect();
- }
- else
- errno = optval;
- }
-
- int i = this->IO->Connected(this);
- if (i == 1)
- return BufferedSocket::ProcessRead();
- else if (i == 0)
- return true;
-
- this->OnError(Anope::LastError());
- return false;
-}
-
-/** Called when the socket is ready to be written to
- * @return true on success, false to drop this socket
- */
-bool ConnectionSocket::ProcessWrite()
-{
- if (!this->connected)
- {
- int optval = 0;
- socklen_t optlen = sizeof(optval);
- if (!getsockopt(this->GetFD(), SOL_SOCKET, SO_ERROR, reinterpret_cast<char *>(&optval), &optlen) && !optval)
- {
- this->connected = true;
- this->OnConnect();
- }
- else
- errno = optval;
- }
-
- int i = this->IO->Connected(this);
- if (i == 1)
- return BufferedSocket::ProcessWrite();
- else if (i == 0)
- return true;
-
- this->OnError(Anope::LastError());
- return false;
-}
-
-/** Called when there is an error for this socket
- * @return true on success, false to drop this socket
- */
-void ConnectionSocket::ProcessError()
-{
- int optval = 0;
- socklen_t optlen = sizeof(optval);
- getsockopt(this->GetFD(), SOL_SOCKET, SO_ERROR, reinterpret_cast<char *>(&optval), &optlen);
- errno = optval;
- this->OnError(optval ? Anope::LastError() : "");
-}
-
-/** Called on a successful connect
- */
-void ConnectionSocket::OnConnect()
-{
-}
-
-/** Called when a connection is not successful
- * @param error The error
- */
-void ConnectionSocket::OnError(const Anope::string &error)
-{
-}
-
-/** Constructor
- * @param ls Listen socket this connection is from
- * @param fd New FD for this socket
- * @param addr Address the connection came from
- */
-ClientSocket::ClientSocket(ListenSocket *ls, int fd, const sockaddrs &addr) : BufferedSocket(fd, ls->IsIPv6()), LS(ls), clientaddr(addr)
-{
- this->Type = SOCKTYPE_CLIENT;
-}
-
-/** Called when there is something to be received for this socket
- * @return true on success, false to drop this socket
- */
-bool ClientSocket::ProcessRead()
-{
- int i = this->IO->Accepted(this);
- if (i == 1)
- return BufferedSocket::ProcessRead();
- else if (i == 0)
- return true;
- return false;
-}
-
-/** Called when the socket is ready to be written to
- * @return true on success, false to drop this socket
- */
-bool ClientSocket::ProcessWrite()
-{
- int i = this->IO->Accepted(this);
- if (i == 1)
- return BufferedSocket::ProcessWrite();
- else if (i == 0)
- return true;
- return false;
-}
-