summaryrefslogtreecommitdiff
path: root/src/socketengines
diff options
context:
space:
mode:
authorAdam <Adam@anope.org>2011-08-21 13:38:42 -0400
committerAdam <Adam@anope.org>2011-09-10 01:55:09 -0400
commit2eb708e5ad8b259876d24d828f7472b77864c256 (patch)
treebed6b70d4bc67eb413453a116e77f8f724cdf3fd /src/socketengines
parent4fcb371bc8813cd647b7769a64d586e3a57d684d (diff)
Cleaned up some of the socket code, cleaned up the pipe engines, added support for binary sockets, and cleaned up the asynch connect/accept code
Diffstat (limited to 'src/socketengines')
-rw-r--r--src/socketengines/pipeengine_eventfd.cpp49
-rw-r--r--src/socketengines/pipeengine_pipe.cpp49
-rw-r--r--src/socketengines/pipeengine_win32.cpp83
-rw-r--r--src/socketengines/socketengine_epoll.cpp47
-rw-r--r--src/socketengines/socketengine_poll.cpp34
-rw-r--r--src/socketengines/socketengine_select.cpp16
6 files changed, 91 insertions, 187 deletions
diff --git a/src/socketengines/pipeengine_eventfd.cpp b/src/socketengines/pipeengine_eventfd.cpp
index aa712de3d..21931bb1b 100644
--- a/src/socketengines/pipeengine_eventfd.cpp
+++ b/src/socketengines/pipeengine_eventfd.cpp
@@ -1,64 +1,29 @@
#include "services.h"
#include <sys/eventfd.h>
-class PipeIO : public SocketIO
+Pipe::Pipe() : Socket(eventfd(0, EFD_NONBLOCK))
{
- public:
- /** Receive something from the buffer
- * @param s The socket
- * @param buf The buf to read to
- * @param sz How much to read
- * @return Number of bytes received
- */
- int Recv(Socket *s, char *buf, size_t sz)
- {
- static eventfd_t dummy;
- return !eventfd_read(s->GetFD(), &dummy);
- }
-
- /** Write something to the socket
- * @param s The socket
- * @param buf What to write
- * @return Number of bytes written
- */
- int Send(Socket *s, const Anope::string &buf)
- {
- return !eventfd_write(s->GetFD(), 1);
- }
-} pipeSocketIO;
-
-Pipe::Pipe() : BufferedSocket()
-{
- this->IO = &pipeSocketIO;
- this->Sock = eventfd(0, EFD_NONBLOCK);
if (this->Sock < 0)
- throw CoreException(Anope::string("Could not create pipe: ") + Anope::LastError());
-
- this->IPv6 = false;
+ throw CoreException("Could not create pipe: " + Anope::LastError());
SocketEngine::AddSocket(this);
}
-bool Pipe::ProcessRead()
+Pipe::~Pipe()
{
- this->IO->Recv(this, NULL, 0);
- return this->Read("");
}
-bool Pipe::Read(const Anope::string &)
+bool Pipe::ProcessRead()
{
+ eventfd_t dummy;
+ eventfd_read(this->GetFD(), &dummy);
this->OnNotify();
return true;
}
void Pipe::Notify()
{
- /* Note we send this immediatly. If use use Socket::Write and if this functions is called
- * from a thread, only epoll is able to pick up the change to this sockets want flags immediately
- * Other engines time out then pick up and write the change then read it back, which
- * is too slow for most things.
- */
- this->IO->Send(this, "");
+ eventfd_write(this->GetFD(), 1);
}
void Pipe::OnNotify()
diff --git a/src/socketengines/pipeengine_pipe.cpp b/src/socketengines/pipeengine_pipe.cpp
index b3ca15d87..ceaded8ea 100644
--- a/src/socketengines/pipeengine_pipe.cpp
+++ b/src/socketengines/pipeengine_pipe.cpp
@@ -1,67 +1,40 @@
#include "services.h"
-class PipeIO : public SocketIO
-{
- public:
- /** Receive something from the buffer
- * @param s The socket
- * @param buf The buf to read to
- * @param sz How much to read
- * @return Number of bytes received
- */
- int Recv(Socket *s, char *buf, size_t sz)
- {
- static char dummy[512];
- while (read(s->GetFD(), &dummy, 512) == 512);
- return 0;
- }
-
- /** Write something to the socket
- * @param s The socket
- * @param buf What to write
- * @return Number of bytes written
- */
- int Send(Socket *s, const Anope::string &buf)
- {
- static const char dummy = '*';
- Pipe *pipe = debug_cast<Pipe *>(s);
- return write(pipe->WritePipe, &dummy, 1);
- }
-} pipeSocketIO;
-
-Pipe::Pipe() : BufferedSocket()
+Pipe::Pipe() : Socket(-1)
{
int fds[2];
if (pipe(fds))
- throw CoreException(Anope::string("Could not create pipe: ") + Anope::LastError());
+ throw CoreException("Could not create pipe: " + Anope::LastError());
int flags = fcntl(fds[0], F_GETFL, 0);
fcntl(fds[0], F_SETFL, flags | O_NONBLOCK);
flags = fcntl(fds[1], F_GETFL, 0);
fcntl(fds[1], F_SETFL, flags | O_NONBLOCK);
- this->IO = &pipeSocketIO;
+ this->~Socket();
+
this->Sock = fds[0];
this->WritePipe = fds[1];
- this->IPv6 = false;
SocketEngine::AddSocket(this);
}
-bool Pipe::ProcessRead()
+Pipe::~Pipe()
{
- this->IO->Recv(this, NULL, 0);
- return this->Read("");
+ CloseSocket(this->WritePipe);
}
-bool Pipe::Read(const Anope::string &)
+bool Pipe::ProcessRead()
{
+ char dummy[512];
+ while (read(this->GetFD(), &dummy, 512) == 512);
this->OnNotify();
return true;
}
void Pipe::Notify()
{
- this->IO->Send(this, "");
+ const char dummy = '*';
+ write(this->WritePipe, &dummy, 1);
}
void Pipe::OnNotify()
diff --git a/src/socketengines/pipeengine_win32.cpp b/src/socketengines/pipeengine_win32.cpp
index 3bd18d0cd..f34472870 100644
--- a/src/socketengines/pipeengine_win32.cpp
+++ b/src/socketengines/pipeengine_win32.cpp
@@ -1,89 +1,50 @@
#include "services.h"
-static ClientSocket *newsocket = NULL;
-
-class LSocket : public ListenSocket
-{
- public:
- LSocket(const Anope::string &host, int port) : ListenSocket(host, port, false) { }
-
- ClientSocket *OnAccept(int fd, const sockaddrs &addr)
- {
- newsocket = new ClientSocket(this, fd, addr);
- return newsocket;
- }
-};
-
-class PipeIO : public SocketIO
+Pipe::Pipe() : Socket(-1)
{
- public:
- /** Receive something from the buffer
- * @param s The socket
- * @param buf The buf to read to
- * @param sz How much to read
- * @return Number of bytes received
- */
- int Recv(Socket *s, char *buf, size_t sz)
- {
- static char dummy[512];
- return recv(s->GetFD(), dummy, 512, 0);
- }
-
- /** Write something to the socket
- * @param s The socket
- * @param buf What to write
- * @return Number of bytes written
- */
- int Send(Socket *s, const Anope::string &buf)
- {
- static const char dummy = '*';
- Pipe *pipe = debug_cast<Pipe *>(s);
- return send(pipe->WritePipe, &dummy, 1, 0);
- }
-} pipeSocketIO;
+ sockaddrs localhost;
-Pipe::Pipe() : BufferedSocket()
-{
- LSocket lfs("127.0.0.1", 0);
+ localhost.pton(AF_INET, "127.0.0.1");
- int cfd = socket(AF_INET, SOCK_STREAM, 0);
+ int cfd = socket(AF_INET, SOCK_STREAM, 0), lfd = socket(AF_INET, SOCK_STREAM, 0);
if (cfd == -1)
throw CoreException("Error accepting new socket for Pipe");
+
+ if (bind(lfd, &localhost.sa, localhost.size()) == -1)
+ throw CoreException("Error accepting new socket for Pipe");
+ if (listen(lfd, 1) == -1)
+ throw CoreException("Error accepting new socket for Pipe");
- sockaddr_in addr;
- socklen_t sz = sizeof(addr);
- getsockname(lfs.GetFD(), reinterpret_cast<sockaddr *>(&addr), &sz);
+ sockaddrs lfd_addr;
+ socklen_t sz = sizeof(lfd_addr);
+ getsockname(lfd, &lfd_addr.sa, &sz);
- if (connect(cfd, reinterpret_cast<sockaddr *>(&addr), sz))
- throw CoreException("Error accepting new socket for Pipe");
- lfs.ProcessRead();
- if (!newsocket)
+ if (connect(cfd, &lfd_addr.sa, lfd_addr.size()))
throw CoreException("Error accepting new socket for Pipe");
+ CloseSocket(lfd);
- this->IO = &pipeSocketIO;
- this->Sock = cfd;
- this->WritePipe = newsocket->GetFD();
- this->IPv6 = false;
+ this->WritePipe = cfd;
SocketEngine::AddSocket(this);
- newsocket = NULL;
}
-bool Pipe::ProcessRead()
+Pipe::~Pipe()
{
- this->IO->Recv(this, NULL, 0);
- return this->Read("");
+ CloseSocket(this->WritePipe);
}
-bool Pipe::Read(const Anope::string &)
+bool Pipe::ProcessRead()
{
+ char dummy[512];
+ while (recv(this->GetFD(), dummy, 512, 0) == 512);
this->OnNotify();
return true;
}
void Pipe::Notify()
{
- this->IO->Send(this, "");
+ const char dummy = '*';
+ send(this->WritePipe, &dummy, 1, 0);
}
void Pipe::OnNotify()
diff --git a/src/socketengines/socketengine_epoll.cpp b/src/socketengines/socketengine_epoll.cpp
index ad9bef3ce..73f7e10d6 100644
--- a/src/socketengines/socketengine_epoll.cpp
+++ b/src/socketengines/socketengine_epoll.cpp
@@ -11,18 +11,12 @@ void SocketEngine::Init()
max = ulimit(4, 0);
if (max <= 0)
- {
- Log() << "Can't determine maximum number of open sockets";
- throw CoreException("Can't determine maximum number of open sockets");
- }
+ throw SocketException("Can't determine maximum number of open sockets");
EngineHandle = epoll_create(max / 4);
if (EngineHandle == -1)
- {
- Log() << "Could not initialize epoll socket engine: " << Anope::LastError();
- throw CoreException(Anope::string("Could not initialize epoll socket engine: ") + Anope::LastError());
- }
+ throw SocketException("Could not initialize epoll socket engine: " + Anope::LastError());
events = new epoll_event[max];
memset(events, 0, sizeof(epoll_event) * max);
@@ -53,10 +47,7 @@ void SocketEngine::AddSocket(Socket *s)
ev.data.fd = s->GetFD();
if (epoll_ctl(EngineHandle, EPOLL_CTL_ADD, ev.data.fd, &ev) == -1)
- {
- Log() << "Unable to add fd " << ev.data.fd << " to socketengine epoll: " << Anope::LastError();
- return;
- }
+ throw SocketException("Unable to add fd " + stringify(ev.data.fd) + " to epoll: " + Anope::LastError());
Sockets.insert(std::make_pair(ev.data.fd, s));
}
@@ -70,10 +61,7 @@ void SocketEngine::DelSocket(Socket *s)
ev.data.fd = s->GetFD();
if (epoll_ctl(EngineHandle, EPOLL_CTL_DEL, ev.data.fd, &ev) == -1)
- {
- Log() << "Unable to delete fd " << ev.data.fd << " from socketengine epoll: " << Anope::LastError();
- return;
- }
+ throw SocketException("Unable to remove fd " + stringify(ev.data.fd) + " from epoll: " + Anope::LastError());
Sockets.erase(ev.data.fd);
}
@@ -91,9 +79,9 @@ void SocketEngine::MarkWritable(Socket *s)
ev.data.fd = s->GetFD();
if (epoll_ctl(EngineHandle, EPOLL_CTL_MOD, ev.data.fd, &ev) == -1)
- Log() << "Unable to mark fd " << ev.data.fd << " as writable in socketengine epoll: " << Anope::LastError();
- else
- s->SetFlag(SF_WRITABLE);
+ throw SocketException("Unable to mark fd " + stringify(ev.data.fd) + " as writable in epoll: " + Anope::LastError());
+
+ s->SetFlag(SF_WRITABLE);
}
void SocketEngine::ClearWritable(Socket *s)
@@ -109,9 +97,9 @@ void SocketEngine::ClearWritable(Socket *s)
ev.data.fd = s->GetFD();
if (epoll_ctl(EngineHandle, EPOLL_CTL_MOD, ev.data.fd, &ev) == -1)
- Log() << "Unable to mark fd " << ev.data.fd << " as unwritable in socketengine epoll: " << Anope::LastError();
- else
- s->UnsetFlag(SF_WRITABLE);
+ throw SocketException("Unable clear mark fd " + stringify(ev.data.fd) + " as writable in epoll: " + Anope::LastError());
+
+ s->UnsetFlag(SF_WRITABLE);
}
void SocketEngine::Process()
@@ -130,10 +118,15 @@ void SocketEngine::Process()
for (int i = 0; i < total; ++i)
{
epoll_event *ev = &events[i];
- Socket *s = Sockets[ev->data.fd];
+
+ std::map<int, Socket *>::iterator it = Sockets.find(ev->data.fd);
+ if (it == Sockets.end())
+ continue;
+ Socket *s = it->second;
if (s->HasFlag(SF_DEAD))
continue;
+
if (ev->events & (EPOLLHUP | EPOLLERR))
{
s->ProcessError();
@@ -141,6 +134,9 @@ void SocketEngine::Process()
continue;
}
+ if (!s->Process())
+ continue;
+
if ((ev->events & EPOLLIN) && !s->ProcessRead())
s->SetFlag(SF_DEAD);
@@ -151,7 +147,10 @@ void SocketEngine::Process()
for (int i = 0; i < total; ++i)
{
epoll_event *ev = &events[i];
- Socket *s = Sockets[ev->data.fd];
+ std::map<int, Socket *>::iterator it = Sockets.find(ev->data.fd);
+ if (it == Sockets.end())
+ continue;
+ Socket *s = it->second;
if (s->HasFlag(SF_DEAD))
delete s;
diff --git a/src/socketengines/socketengine_poll.cpp b/src/socketengines/socketengine_poll.cpp
index 278485fa7..ba111659a 100644
--- a/src/socketengines/socketengine_poll.cpp
+++ b/src/socketengines/socketengine_poll.cpp
@@ -50,10 +50,7 @@ void SocketEngine::Shutdown()
void SocketEngine::AddSocket(Socket *s)
{
if (SocketCount == max)
- {
- Log() << "Unable to add fd " << s->GetFD() << " to socketengine poll, engine is full";
- return;
- }
+ throw SocketException("Unable to add fd " + stringify(s->GetFD()) + " to poll: " + Anope::LastError());
pollfd *ev = &events[SocketCount];
ev->fd = s->GetFD();
@@ -70,10 +67,7 @@ void SocketEngine::DelSocket(Socket *s)
{
std::map<int, int>::iterator pos = socket_positions.find(s->GetFD());
if (pos == socket_positions.end())
- {
- Log() << "Unable to delete unknown fd " << s->GetFD() << " from socketengine poll";
- return;
- }
+ throw SocketException("Unable to remove fd " + stringify(s->GetFD()) + " from poll");
if (pos->second != SocketCount - 1)
{
@@ -100,10 +94,7 @@ void SocketEngine::MarkWritable(Socket *s)
std::map<int, int>::iterator pos = socket_positions.find(s->GetFD());
if (pos == socket_positions.end())
- {
- Log() << "Unable to mark unknown fd " << s->GetFD() << " as writable";
- return;
- }
+ throw SocketException("Unable to mark fd " + stringify(s->GetFD()) + " as writable in poll");
pollfd *ev = &events[pos->second];
ev->events |= POLLOUT;
@@ -118,10 +109,7 @@ void SocketEngine::ClearWritable(Socket *s)
std::map<int, int>::iterator pos = socket_positions.find(s->GetFD());
if (pos == socket_positions.end())
- {
- Log() << "Unable to mark unknown fd " << s->GetFD() << " as writable";
- return;
- }
+ throw SocketException("Unable clear mark fd " + stringify(s->GetFD()) + " as writable in poll");
pollfd *ev = &events[pos->second];
ev->events &= ~POLLOUT;
@@ -149,10 +137,14 @@ void SocketEngine::Process()
if (ev->revents != 0)
++processed;
- Socket *s = Sockets[ev->fd];
+ std::map<int, Socket *>::iterator it = Sockets.find(ev->fd);
+ if (it == Sockets.end())
+ continue;
+ Socket *s = it->second;
if (s->HasFlag(SF_DEAD))
continue;
+
if (ev->revents & (POLLERR | POLLRDHUP))
{
s->ProcessError();
@@ -160,6 +152,9 @@ void SocketEngine::Process()
continue;
}
+ if (!s->Process())
+ continue;
+
if ((ev->revents & POLLIN) && !s->ProcessRead())
s->SetFlag(SF_DEAD);
@@ -170,7 +165,10 @@ void SocketEngine::Process()
for (int i = 0; i < SocketCount; ++i)
{
pollfd *ev = &events[i];
- Socket *s = Sockets[ev->fd];
+ std::map<int, Socket *>::iterator it = Sockets.find(ev->fd);
+ if (it == Sockets.end())
+ continue;
+ Socket *s = it->second;
if (s->HasFlag(SF_DEAD))
delete s;
diff --git a/src/socketengines/socketengine_select.cpp b/src/socketengines/socketengine_select.cpp
index 34a86f680..ad6e04ca9 100644
--- a/src/socketengines/socketengine_select.cpp
+++ b/src/socketengines/socketengine_select.cpp
@@ -96,19 +96,27 @@ void SocketEngine::Process()
{
Socket *s = it->second;
- if (FD_ISSET(s->GetFD(), &efdset) || FD_ISSET(s->GetFD(), &rfdset) || FD_ISSET(s->GetFD(), &wfdset))
+ bool has_read = FD_ISSET(s->GetFD(), &rfdset), has_write = FD_ISSET(s->GetFD(), &wfdset), has_error = FD_ISSET(s->GetFD(), &efdset);
+ if (has_read || has_write || has_error)
++processed;
+
if (s->HasFlag(SF_DEAD))
continue;
- if (FD_ISSET(s->GetFD(), &efdset))
+
+ if (has_error)
{
s->ProcessError();
s->SetFlag(SF_DEAD);
continue;
}
- if (FD_ISSET(s->GetFD(), &rfdset) && !s->ProcessRead())
+
+ if (!s->Process())
+ continue;
+
+ if (has_read && !s->ProcessRead())
s->SetFlag(SF_DEAD);
- if (FD_ISSET(s->GetFD(), &wfdset) && !s->ProcessWrite())
+
+ if (has_write && !s->ProcessWrite())
s->SetFlag(SF_DEAD);
}