diff options
author | Adam <Adam@anope.org> | 2011-08-21 13:38:42 -0400 |
---|---|---|
committer | Adam <Adam@anope.org> | 2011-09-10 01:55:09 -0400 |
commit | 2eb708e5ad8b259876d24d828f7472b77864c256 (patch) | |
tree | bed6b70d4bc67eb413453a116e77f8f724cdf3fd /src/socketengines | |
parent | 4fcb371bc8813cd647b7769a64d586e3a57d684d (diff) |
Cleaned up some of the socket code, cleaned up the pipe engines, added support for binary sockets, and cleaned up the asynch connect/accept code
Diffstat (limited to 'src/socketengines')
-rw-r--r-- | src/socketengines/pipeengine_eventfd.cpp | 49 | ||||
-rw-r--r-- | src/socketengines/pipeengine_pipe.cpp | 49 | ||||
-rw-r--r-- | src/socketengines/pipeengine_win32.cpp | 83 | ||||
-rw-r--r-- | src/socketengines/socketengine_epoll.cpp | 47 | ||||
-rw-r--r-- | src/socketengines/socketengine_poll.cpp | 34 | ||||
-rw-r--r-- | src/socketengines/socketengine_select.cpp | 16 |
6 files changed, 91 insertions, 187 deletions
diff --git a/src/socketengines/pipeengine_eventfd.cpp b/src/socketengines/pipeengine_eventfd.cpp index aa712de3d..21931bb1b 100644 --- a/src/socketengines/pipeengine_eventfd.cpp +++ b/src/socketengines/pipeengine_eventfd.cpp @@ -1,64 +1,29 @@ #include "services.h" #include <sys/eventfd.h> -class PipeIO : public SocketIO +Pipe::Pipe() : Socket(eventfd(0, EFD_NONBLOCK)) { - public: - /** Receive something from the buffer - * @param s The socket - * @param buf The buf to read to - * @param sz How much to read - * @return Number of bytes received - */ - int Recv(Socket *s, char *buf, size_t sz) - { - static eventfd_t dummy; - return !eventfd_read(s->GetFD(), &dummy); - } - - /** Write something to the socket - * @param s The socket - * @param buf What to write - * @return Number of bytes written - */ - int Send(Socket *s, const Anope::string &buf) - { - return !eventfd_write(s->GetFD(), 1); - } -} pipeSocketIO; - -Pipe::Pipe() : BufferedSocket() -{ - this->IO = &pipeSocketIO; - this->Sock = eventfd(0, EFD_NONBLOCK); if (this->Sock < 0) - throw CoreException(Anope::string("Could not create pipe: ") + Anope::LastError()); - - this->IPv6 = false; + throw CoreException("Could not create pipe: " + Anope::LastError()); SocketEngine::AddSocket(this); } -bool Pipe::ProcessRead() +Pipe::~Pipe() { - this->IO->Recv(this, NULL, 0); - return this->Read(""); } -bool Pipe::Read(const Anope::string &) +bool Pipe::ProcessRead() { + eventfd_t dummy; + eventfd_read(this->GetFD(), &dummy); this->OnNotify(); return true; } void Pipe::Notify() { - /* Note we send this immediatly. If use use Socket::Write and if this functions is called - * from a thread, only epoll is able to pick up the change to this sockets want flags immediately - * Other engines time out then pick up and write the change then read it back, which - * is too slow for most things. - */ - this->IO->Send(this, ""); + eventfd_write(this->GetFD(), 1); } void Pipe::OnNotify() diff --git a/src/socketengines/pipeengine_pipe.cpp b/src/socketengines/pipeengine_pipe.cpp index b3ca15d87..ceaded8ea 100644 --- a/src/socketengines/pipeengine_pipe.cpp +++ b/src/socketengines/pipeengine_pipe.cpp @@ -1,67 +1,40 @@ #include "services.h" -class PipeIO : public SocketIO -{ - public: - /** Receive something from the buffer - * @param s The socket - * @param buf The buf to read to - * @param sz How much to read - * @return Number of bytes received - */ - int Recv(Socket *s, char *buf, size_t sz) - { - static char dummy[512]; - while (read(s->GetFD(), &dummy, 512) == 512); - return 0; - } - - /** Write something to the socket - * @param s The socket - * @param buf What to write - * @return Number of bytes written - */ - int Send(Socket *s, const Anope::string &buf) - { - static const char dummy = '*'; - Pipe *pipe = debug_cast<Pipe *>(s); - return write(pipe->WritePipe, &dummy, 1); - } -} pipeSocketIO; - -Pipe::Pipe() : BufferedSocket() +Pipe::Pipe() : Socket(-1) { int fds[2]; if (pipe(fds)) - throw CoreException(Anope::string("Could not create pipe: ") + Anope::LastError()); + throw CoreException("Could not create pipe: " + Anope::LastError()); int flags = fcntl(fds[0], F_GETFL, 0); fcntl(fds[0], F_SETFL, flags | O_NONBLOCK); flags = fcntl(fds[1], F_GETFL, 0); fcntl(fds[1], F_SETFL, flags | O_NONBLOCK); - this->IO = &pipeSocketIO; + this->~Socket(); + this->Sock = fds[0]; this->WritePipe = fds[1]; - this->IPv6 = false; SocketEngine::AddSocket(this); } -bool Pipe::ProcessRead() +Pipe::~Pipe() { - this->IO->Recv(this, NULL, 0); - return this->Read(""); + CloseSocket(this->WritePipe); } -bool Pipe::Read(const Anope::string &) +bool Pipe::ProcessRead() { + char dummy[512]; + while (read(this->GetFD(), &dummy, 512) == 512); this->OnNotify(); return true; } void Pipe::Notify() { - this->IO->Send(this, ""); + const char dummy = '*'; + write(this->WritePipe, &dummy, 1); } void Pipe::OnNotify() diff --git a/src/socketengines/pipeengine_win32.cpp b/src/socketengines/pipeengine_win32.cpp index 3bd18d0cd..f34472870 100644 --- a/src/socketengines/pipeengine_win32.cpp +++ b/src/socketengines/pipeengine_win32.cpp @@ -1,89 +1,50 @@ #include "services.h" -static ClientSocket *newsocket = NULL; - -class LSocket : public ListenSocket -{ - public: - LSocket(const Anope::string &host, int port) : ListenSocket(host, port, false) { } - - ClientSocket *OnAccept(int fd, const sockaddrs &addr) - { - newsocket = new ClientSocket(this, fd, addr); - return newsocket; - } -}; - -class PipeIO : public SocketIO +Pipe::Pipe() : Socket(-1) { - public: - /** Receive something from the buffer - * @param s The socket - * @param buf The buf to read to - * @param sz How much to read - * @return Number of bytes received - */ - int Recv(Socket *s, char *buf, size_t sz) - { - static char dummy[512]; - return recv(s->GetFD(), dummy, 512, 0); - } - - /** Write something to the socket - * @param s The socket - * @param buf What to write - * @return Number of bytes written - */ - int Send(Socket *s, const Anope::string &buf) - { - static const char dummy = '*'; - Pipe *pipe = debug_cast<Pipe *>(s); - return send(pipe->WritePipe, &dummy, 1, 0); - } -} pipeSocketIO; + sockaddrs localhost; -Pipe::Pipe() : BufferedSocket() -{ - LSocket lfs("127.0.0.1", 0); + localhost.pton(AF_INET, "127.0.0.1"); - int cfd = socket(AF_INET, SOCK_STREAM, 0); + int cfd = socket(AF_INET, SOCK_STREAM, 0), lfd = socket(AF_INET, SOCK_STREAM, 0); if (cfd == -1) throw CoreException("Error accepting new socket for Pipe"); + + if (bind(lfd, &localhost.sa, localhost.size()) == -1) + throw CoreException("Error accepting new socket for Pipe"); + if (listen(lfd, 1) == -1) + throw CoreException("Error accepting new socket for Pipe"); - sockaddr_in addr; - socklen_t sz = sizeof(addr); - getsockname(lfs.GetFD(), reinterpret_cast<sockaddr *>(&addr), &sz); + sockaddrs lfd_addr; + socklen_t sz = sizeof(lfd_addr); + getsockname(lfd, &lfd_addr.sa, &sz); - if (connect(cfd, reinterpret_cast<sockaddr *>(&addr), sz)) - throw CoreException("Error accepting new socket for Pipe"); - lfs.ProcessRead(); - if (!newsocket) + if (connect(cfd, &lfd_addr.sa, lfd_addr.size())) throw CoreException("Error accepting new socket for Pipe"); + CloseSocket(lfd); - this->IO = &pipeSocketIO; - this->Sock = cfd; - this->WritePipe = newsocket->GetFD(); - this->IPv6 = false; + this->WritePipe = cfd; SocketEngine::AddSocket(this); - newsocket = NULL; } -bool Pipe::ProcessRead() +Pipe::~Pipe() { - this->IO->Recv(this, NULL, 0); - return this->Read(""); + CloseSocket(this->WritePipe); } -bool Pipe::Read(const Anope::string &) +bool Pipe::ProcessRead() { + char dummy[512]; + while (recv(this->GetFD(), dummy, 512, 0) == 512); this->OnNotify(); return true; } void Pipe::Notify() { - this->IO->Send(this, ""); + const char dummy = '*'; + send(this->WritePipe, &dummy, 1, 0); } void Pipe::OnNotify() diff --git a/src/socketengines/socketengine_epoll.cpp b/src/socketengines/socketengine_epoll.cpp index ad9bef3ce..73f7e10d6 100644 --- a/src/socketengines/socketengine_epoll.cpp +++ b/src/socketengines/socketengine_epoll.cpp @@ -11,18 +11,12 @@ void SocketEngine::Init() max = ulimit(4, 0); if (max <= 0) - { - Log() << "Can't determine maximum number of open sockets"; - throw CoreException("Can't determine maximum number of open sockets"); - } + throw SocketException("Can't determine maximum number of open sockets"); EngineHandle = epoll_create(max / 4); if (EngineHandle == -1) - { - Log() << "Could not initialize epoll socket engine: " << Anope::LastError(); - throw CoreException(Anope::string("Could not initialize epoll socket engine: ") + Anope::LastError()); - } + throw SocketException("Could not initialize epoll socket engine: " + Anope::LastError()); events = new epoll_event[max]; memset(events, 0, sizeof(epoll_event) * max); @@ -53,10 +47,7 @@ void SocketEngine::AddSocket(Socket *s) ev.data.fd = s->GetFD(); if (epoll_ctl(EngineHandle, EPOLL_CTL_ADD, ev.data.fd, &ev) == -1) - { - Log() << "Unable to add fd " << ev.data.fd << " to socketengine epoll: " << Anope::LastError(); - return; - } + throw SocketException("Unable to add fd " + stringify(ev.data.fd) + " to epoll: " + Anope::LastError()); Sockets.insert(std::make_pair(ev.data.fd, s)); } @@ -70,10 +61,7 @@ void SocketEngine::DelSocket(Socket *s) ev.data.fd = s->GetFD(); if (epoll_ctl(EngineHandle, EPOLL_CTL_DEL, ev.data.fd, &ev) == -1) - { - Log() << "Unable to delete fd " << ev.data.fd << " from socketengine epoll: " << Anope::LastError(); - return; - } + throw SocketException("Unable to remove fd " + stringify(ev.data.fd) + " from epoll: " + Anope::LastError()); Sockets.erase(ev.data.fd); } @@ -91,9 +79,9 @@ void SocketEngine::MarkWritable(Socket *s) ev.data.fd = s->GetFD(); if (epoll_ctl(EngineHandle, EPOLL_CTL_MOD, ev.data.fd, &ev) == -1) - Log() << "Unable to mark fd " << ev.data.fd << " as writable in socketengine epoll: " << Anope::LastError(); - else - s->SetFlag(SF_WRITABLE); + throw SocketException("Unable to mark fd " + stringify(ev.data.fd) + " as writable in epoll: " + Anope::LastError()); + + s->SetFlag(SF_WRITABLE); } void SocketEngine::ClearWritable(Socket *s) @@ -109,9 +97,9 @@ void SocketEngine::ClearWritable(Socket *s) ev.data.fd = s->GetFD(); if (epoll_ctl(EngineHandle, EPOLL_CTL_MOD, ev.data.fd, &ev) == -1) - Log() << "Unable to mark fd " << ev.data.fd << " as unwritable in socketengine epoll: " << Anope::LastError(); - else - s->UnsetFlag(SF_WRITABLE); + throw SocketException("Unable clear mark fd " + stringify(ev.data.fd) + " as writable in epoll: " + Anope::LastError()); + + s->UnsetFlag(SF_WRITABLE); } void SocketEngine::Process() @@ -130,10 +118,15 @@ void SocketEngine::Process() for (int i = 0; i < total; ++i) { epoll_event *ev = &events[i]; - Socket *s = Sockets[ev->data.fd]; + + std::map<int, Socket *>::iterator it = Sockets.find(ev->data.fd); + if (it == Sockets.end()) + continue; + Socket *s = it->second; if (s->HasFlag(SF_DEAD)) continue; + if (ev->events & (EPOLLHUP | EPOLLERR)) { s->ProcessError(); @@ -141,6 +134,9 @@ void SocketEngine::Process() continue; } + if (!s->Process()) + continue; + if ((ev->events & EPOLLIN) && !s->ProcessRead()) s->SetFlag(SF_DEAD); @@ -151,7 +147,10 @@ void SocketEngine::Process() for (int i = 0; i < total; ++i) { epoll_event *ev = &events[i]; - Socket *s = Sockets[ev->data.fd]; + std::map<int, Socket *>::iterator it = Sockets.find(ev->data.fd); + if (it == Sockets.end()) + continue; + Socket *s = it->second; if (s->HasFlag(SF_DEAD)) delete s; diff --git a/src/socketengines/socketengine_poll.cpp b/src/socketengines/socketengine_poll.cpp index 278485fa7..ba111659a 100644 --- a/src/socketengines/socketengine_poll.cpp +++ b/src/socketengines/socketengine_poll.cpp @@ -50,10 +50,7 @@ void SocketEngine::Shutdown() void SocketEngine::AddSocket(Socket *s) { if (SocketCount == max) - { - Log() << "Unable to add fd " << s->GetFD() << " to socketengine poll, engine is full"; - return; - } + throw SocketException("Unable to add fd " + stringify(s->GetFD()) + " to poll: " + Anope::LastError()); pollfd *ev = &events[SocketCount]; ev->fd = s->GetFD(); @@ -70,10 +67,7 @@ void SocketEngine::DelSocket(Socket *s) { std::map<int, int>::iterator pos = socket_positions.find(s->GetFD()); if (pos == socket_positions.end()) - { - Log() << "Unable to delete unknown fd " << s->GetFD() << " from socketengine poll"; - return; - } + throw SocketException("Unable to remove fd " + stringify(s->GetFD()) + " from poll"); if (pos->second != SocketCount - 1) { @@ -100,10 +94,7 @@ void SocketEngine::MarkWritable(Socket *s) std::map<int, int>::iterator pos = socket_positions.find(s->GetFD()); if (pos == socket_positions.end()) - { - Log() << "Unable to mark unknown fd " << s->GetFD() << " as writable"; - return; - } + throw SocketException("Unable to mark fd " + stringify(s->GetFD()) + " as writable in poll"); pollfd *ev = &events[pos->second]; ev->events |= POLLOUT; @@ -118,10 +109,7 @@ void SocketEngine::ClearWritable(Socket *s) std::map<int, int>::iterator pos = socket_positions.find(s->GetFD()); if (pos == socket_positions.end()) - { - Log() << "Unable to mark unknown fd " << s->GetFD() << " as writable"; - return; - } + throw SocketException("Unable clear mark fd " + stringify(s->GetFD()) + " as writable in poll"); pollfd *ev = &events[pos->second]; ev->events &= ~POLLOUT; @@ -149,10 +137,14 @@ void SocketEngine::Process() if (ev->revents != 0) ++processed; - Socket *s = Sockets[ev->fd]; + std::map<int, Socket *>::iterator it = Sockets.find(ev->fd); + if (it == Sockets.end()) + continue; + Socket *s = it->second; if (s->HasFlag(SF_DEAD)) continue; + if (ev->revents & (POLLERR | POLLRDHUP)) { s->ProcessError(); @@ -160,6 +152,9 @@ void SocketEngine::Process() continue; } + if (!s->Process()) + continue; + if ((ev->revents & POLLIN) && !s->ProcessRead()) s->SetFlag(SF_DEAD); @@ -170,7 +165,10 @@ void SocketEngine::Process() for (int i = 0; i < SocketCount; ++i) { pollfd *ev = &events[i]; - Socket *s = Sockets[ev->fd]; + std::map<int, Socket *>::iterator it = Sockets.find(ev->fd); + if (it == Sockets.end()) + continue; + Socket *s = it->second; if (s->HasFlag(SF_DEAD)) delete s; diff --git a/src/socketengines/socketengine_select.cpp b/src/socketengines/socketengine_select.cpp index 34a86f680..ad6e04ca9 100644 --- a/src/socketengines/socketengine_select.cpp +++ b/src/socketengines/socketengine_select.cpp @@ -96,19 +96,27 @@ void SocketEngine::Process() { Socket *s = it->second; - if (FD_ISSET(s->GetFD(), &efdset) || FD_ISSET(s->GetFD(), &rfdset) || FD_ISSET(s->GetFD(), &wfdset)) + bool has_read = FD_ISSET(s->GetFD(), &rfdset), has_write = FD_ISSET(s->GetFD(), &wfdset), has_error = FD_ISSET(s->GetFD(), &efdset); + if (has_read || has_write || has_error) ++processed; + if (s->HasFlag(SF_DEAD)) continue; - if (FD_ISSET(s->GetFD(), &efdset)) + + if (has_error) { s->ProcessError(); s->SetFlag(SF_DEAD); continue; } - if (FD_ISSET(s->GetFD(), &rfdset) && !s->ProcessRead()) + + if (!s->Process()) + continue; + + if (has_read && !s->ProcessRead()) s->SetFlag(SF_DEAD); - if (FD_ISSET(s->GetFD(), &wfdset) && !s->ProcessWrite()) + + if (has_write && !s->ProcessWrite()) s->SetFlag(SF_DEAD); } |