diff options
author | Adam <Adam@anope.org> | 2011-04-26 19:13:51 -0400 |
---|---|---|
committer | Adam <Adam@anope.org> | 2011-05-16 04:08:47 -0400 |
commit | e7887c1f013248274574ab8e3167f742ccb3d69b (patch) | |
tree | f9f5959512b7129711f03156320ae0e46cabaec3 /src/socketengines | |
parent | 076ebafa1b4cc935c466c615b94eaac415af9a67 (diff) |
Unmodularized the socket engine because its causing problems and really is unnecessary
Diffstat (limited to 'src/socketengines')
-rw-r--r-- | src/socketengines/pipeengine_eventfd.cpp (renamed from src/socketengines/socketengine_eventfd.cpp) | 2 | ||||
-rw-r--r-- | src/socketengines/pipeengine_pipe.cpp (renamed from src/socketengines/socketengine_pipe.cpp) | 0 | ||||
-rw-r--r-- | src/socketengines/pipeengine_win32.cpp (renamed from src/socketengines/socketengine_win32.cpp) | 2 | ||||
-rw-r--r-- | src/socketengines/socketengine_epoll.cpp | 154 | ||||
-rw-r--r-- | src/socketengines/socketengine_poll.cpp | 176 | ||||
-rw-r--r-- | src/socketengines/socketengine_select.cpp | 105 |
6 files changed, 437 insertions, 2 deletions
diff --git a/src/socketengines/socketengine_eventfd.cpp b/src/socketengines/pipeengine_eventfd.cpp index 9e9ec677b..713c73d34 100644 --- a/src/socketengines/socketengine_eventfd.cpp +++ b/src/socketengines/pipeengine_eventfd.cpp @@ -36,7 +36,7 @@ Pipe::Pipe() : BufferedSocket() this->IPv6 = false; - SocketEngine->AddSocket(this); + SocketEngine::AddSocket(this); } bool Pipe::ProcessRead() diff --git a/src/socketengines/socketengine_pipe.cpp b/src/socketengines/pipeengine_pipe.cpp index 9d5b47154..9d5b47154 100644 --- a/src/socketengines/socketengine_pipe.cpp +++ b/src/socketengines/pipeengine_pipe.cpp diff --git a/src/socketengines/socketengine_win32.cpp b/src/socketengines/pipeengine_win32.cpp index 78d4bf92d..41ec37a4c 100644 --- a/src/socketengines/socketengine_win32.cpp +++ b/src/socketengines/pipeengine_win32.cpp @@ -65,7 +65,7 @@ Pipe::Pipe() : BufferedSocket() this->WritePipe = newsocket->GetFD(); this->IPv6 = false; - SocketEngine->AddSocket(this); + SocketEngine::AddSocket(this); newsocket = NULL; } diff --git a/src/socketengines/socketengine_epoll.cpp b/src/socketengines/socketengine_epoll.cpp new file mode 100644 index 000000000..0ced8e71e --- /dev/null +++ b/src/socketengines/socketengine_epoll.cpp @@ -0,0 +1,154 @@ +#include "module.h" +#include <sys/epoll.h> +#include <ulimit.h> + +static long max; +static int EngineHandle; +static epoll_event *events; + +void SocketEngine::Init() +{ + max = ulimit(4, 0); + + if (max <= 0) + { + Log() << "Can't determine maximum number of open sockets"; + throw CoreException("Can't determine maximum number of open sockets"); + } + + EngineHandle = epoll_create(max / 4); + + if (EngineHandle == -1) + { + Log() << "Could not initialize epoll socket engine: " << Anope::LastError(); + throw CoreException(Anope::string("Could not initialize epoll socket engine: ") + Anope::LastError()); + } + + events = new epoll_event[max]; + memset(events, 0, sizeof(epoll_event) * max); +} + +void SocketEngine::Shutdown() +{ + for (std::map<int, Socket *>::const_iterator it = Sockets.begin(), it_end = Sockets.end(); it != it_end; ++it) + delete it->second; + Sockets.clear(); + + delete [] events; +} + +void SocketEngine::AddSocket(Socket *s) +{ + epoll_event ev; + + memset(&ev, 0, sizeof(ev)); + + ev.events = EPOLLIN; + ev.data.fd = s->GetFD(); + + if (epoll_ctl(EngineHandle, EPOLL_CTL_ADD, ev.data.fd, &ev) == -1) + { + Log() << "Unable to add fd " << ev.data.fd << " to socketengine epoll: " << Anope::LastError(); + return; + } + + Sockets.insert(std::make_pair(ev.data.fd, s)); +} + +void SocketEngine::DelSocket(Socket *s) +{ + epoll_event ev; + + memset(&ev, 0, sizeof(ev)); + + ev.data.fd = s->GetFD(); + + if (epoll_ctl(EngineHandle, EPOLL_CTL_DEL, ev.data.fd, &ev) == -1) + { + Log() << "Unable to delete fd " << ev.data.fd << " from socketengine epoll: " << Anope::LastError(); + return; + } + + Sockets.erase(ev.data.fd); +} + +void SocketEngine::MarkWritable(Socket *s) +{ + if (s->HasFlag(SF_WRITABLE)) + return; + + epoll_event ev; + + memset(&ev, 0, sizeof(ev)); + + ev.events = EPOLLIN | EPOLLOUT; + ev.data.fd = s->GetFD(); + + if (epoll_ctl(EngineHandle, EPOLL_CTL_MOD, ev.data.fd, &ev) == -1) + Log() << "Unable to mark fd " << ev.data.fd << " as writable in socketengine epoll: " << Anope::LastError(); + else + s->SetFlag(SF_WRITABLE); +} + +void SocketEngine::ClearWritable(Socket *s) +{ + if (!s->HasFlag(SF_WRITABLE)) + return; + + epoll_event ev; + + memset(&ev, 0, sizeof(ev)); + + ev.events = EPOLLIN; + ev.data.fd = s->GetFD(); + + if (epoll_ctl(EngineHandle, EPOLL_CTL_MOD, ev.data.fd, &ev) == -1) + Log() << "Unable to mark fd " << ev.data.fd << " as unwritable in socketengine epoll: " << Anope::LastError(); + else + s->UnsetFlag(SF_WRITABLE); +} + +void SocketEngine::Process() +{ + int total = epoll_wait(EngineHandle, events, max - 1, Config->ReadTimeout * 1000); + Anope::CurTime = time(NULL); + + /* EINTR can be given if the read timeout expires */ + if (total == -1) + { + if (errno != EINTR) + Log() << "SockEngine::Process(): error: " << Anope::LastError(); + return; + } + + for (int i = 0; i < total; ++i) + { + epoll_event *ev = &events[i]; + Socket *s = Sockets[ev->data.fd]; + + if (s->HasFlag(SF_DEAD)) + continue; + if (ev->events & (EPOLLHUP | EPOLLERR)) + { + s->ProcessError(); + s->SetFlag(SF_DEAD); + continue; + } + + if ((ev->events & EPOLLIN) && !s->ProcessRead()) + s->SetFlag(SF_DEAD); + + if ((ev->events & EPOLLOUT) && !s->ProcessWrite()) + s->SetFlag(SF_DEAD); + } + + for (int i = 0; i < total; ++i) + { + epoll_event *ev = &events[i]; + Socket *s = Sockets[ev->data.fd]; + + if (s->HasFlag(SF_DEAD)) + delete s; + } +} + diff --git a/src/socketengines/socketengine_poll.cpp b/src/socketengines/socketengine_poll.cpp new file mode 100644 index 000000000..b8eef91b8 --- /dev/null +++ b/src/socketengines/socketengine_poll.cpp @@ -0,0 +1,176 @@ +#include "module.h" + +#ifndef _WIN32 +# include <ulimit.h> +# include <sys/poll.h> +# include <poll.h> +# ifndef POLLRDHUP +# define POLLRDHUP 0 +# endif +#else +# define poll WSAPoll +# define POLLRDHUP POLLHUP +#endif + +static long max; +static pollfd *events; +static int SocketCount; +static std::map<int, int> socket_positions; + +void SocketEngine::Init() +{ + SocketCount = 0; +#ifndef _WIN32 + max = ulimit(4, 0); +#else + max = 1024; +#endif + + if (max <= 0) + { + Log() << "Can't determine maximum number of open sockets"; + throw CoreException("Can't determine maximum number of open sockets"); + } + + events = new pollfd[max]; +} + +void SocketEngine::Shutdown() +{ + for (std::map<int, Socket *>::const_iterator it = Sockets.begin(), it_end = Sockets.end(); it != it_end; ++it) + delete it->second; + Sockets.clear(); + + delete [] events; +} + +void SocketEngine::AddSocket(Socket *s) +{ + if (SocketCount == max) + { + Log() << "Unable to add fd " << s->GetFD() << " to socketengine poll, engine is full"; + return; + } + + pollfd *ev = &events[SocketCount]; + ev->fd = s->GetFD(); + ev->events = POLLIN; + ev->revents = 0; + + Sockets.insert(std::make_pair(ev->fd, s)); + socket_positions.insert(std::make_pair(ev->fd, SocketCount)); + + ++SocketCount; +} + +void SocketEngine::DelSocket(Socket *s) +{ + std::map<int, int>::iterator pos = socket_positions.find(s->GetFD()); + if (pos == socket_positions.end()) + { + Log() << "Unable to delete unknown fd " << s->GetFD() << " from socketengine poll"; + return; + } + + if (pos->second != SocketCount - 1) + { + pollfd *ev = &events[pos->second], + *last_ev = &events[SocketCount - 1]; + + ev->fd = last_ev->fd; + ev->events = last_ev->events; + ev->revents = last_ev->revents; + + socket_positions[ev->fd] = pos->second; + } + + Sockets.erase(s->GetFD()); + socket_positions.erase(pos); + + --SocketCount; +} + +void SocketEngine::MarkWritable(Socket *s) +{ + if (s->HasFlag(SF_WRITABLE)) + return; + + std::map<int, int>::iterator pos = socket_positions.find(s->GetFD()); + if (pos == socket_positions.end()) + { + Log() << "Unable to mark unknown fd " << s->GetFD() << " as writable"; + return; + } + + pollfd *ev = &events[pos->second]; + ev->events |= POLLOUT; + + s->SetFlag(SF_WRITABLE); +} + +void SocketEngine::ClearWritable(Socket *s) +{ + if (!s->HasFlag(SF_WRITABLE)) + return; + + std::map<int, int>::iterator pos = socket_positions.find(s->GetFD()); + if (pos == socket_positions.end()) + { + Log() << "Unable to mark unknown fd " << s->GetFD() << " as writable"; + return; + } + + pollfd *ev = &events[pos->second]; + ev->events &= ~POLLOUT; + + s->UnsetFlag(SF_WRITABLE); +} + +void SocketEngine::Process() +{ + int total = poll(events, SocketCount, Config->ReadTimeout * 1000); + Anope::CurTime = time(NULL); + + /* EINTR can be given if the read timeout expires */ + if (total == -1) + { + if (errno != EINTR) + Log() << "SockEngine::Process(): error: " << Anope::LastError(); + return; + } + + for (int i = 0, processed = 0; i < SocketCount && processed != total; ++i) + { + pollfd *ev = &events[i]; + + if (ev->revents != 0) + ++processed; + + Socket *s = Sockets[ev->fd]; + + if (s->HasFlag(SF_DEAD)) + continue; + if (ev->revents & (POLLERR | POLLRDHUP)) + { + s->ProcessError(); + s->SetFlag(SF_DEAD); + continue; + } + + if ((ev->revents & POLLIN) && !s->ProcessRead()) + s->SetFlag(SF_DEAD); + + if ((ev->revents & POLLOUT) && !s->ProcessWrite()) + s->SetFlag(SF_DEAD); + } + + for (int i = 0; i < SocketCount; ++i) + { + pollfd *ev = &events[i]; + Socket *s = Sockets[ev->fd]; + + if (s->HasFlag(SF_DEAD)) + delete s; + } +} + diff --git a/src/socketengines/socketengine_select.cpp b/src/socketengines/socketengine_select.cpp new file mode 100644 index 000000000..43b9b17cc --- /dev/null +++ b/src/socketengines/socketengine_select.cpp @@ -0,0 +1,105 @@ +#include "module.h" + +static int MaxFD; +static fd_set ReadFDs; +static fd_set WriteFDs; + +void SocketEngine::Init() +{ + MaxFD = 0; + FD_ZERO(&ReadFDs); + FD_ZERO(&WriteFDs); +} + +void SocketEngine::Shutdown() +{ + for (std::map<int, Socket *>::const_iterator it = Sockets.begin(), it_end = Sockets.end(); it != it_end; ++it) + delete it->second; + Sockets.clear(); + + MaxFD = 0; + FD_ZERO(&ReadFDs); + FD_ZERO(&WriteFDs); +} + +void SocketEngine::AddSocket(Socket *s) +{ + if (s->GetFD() > MaxFD) + MaxFD = s->GetFD(); + FD_SET(s->GetFD(), &ReadFDs); + Sockets.insert(std::make_pair(s->GetFD(), s)); +} + +void SocketEngine::DelSocket(Socket *s) +{ + if (s->GetFD() == MaxFD) + --MaxFD; + FD_CLR(s->GetFD(), &ReadFDs); + FD_CLR(s->GetFD(), &WriteFDs); + Sockets.erase(s->GetFD()); +} + +void SocketEngine::MarkWritable(Socket *s) +{ + if (s->HasFlag(SF_WRITABLE)) + return; + FD_SET(s->GetFD(), &WriteFDs); + s->SetFlag(SF_WRITABLE); +} + +void SocketEngine::ClearWritable(Socket *s) +{ + if (!s->HasFlag(SF_WRITABLE)) + return; + FD_CLR(s->GetFD(), &WriteFDs); + s->UnsetFlag(SF_WRITABLE); +} + +void SocketEngine::Process() +{ + fd_set rfdset = ReadFDs, wfdset = WriteFDs, efdset = ReadFDs; + timeval tval; + tval.tv_sec = Config->ReadTimeout; + tval.tv_usec = 0; + + int sresult = select(MaxFD + 1, &rfdset, &wfdset, &efdset, &tval); + Anope::CurTime = time(NULL); + + if (sresult == -1) + { + Log() << "SockEngine::Process(): error: " << Anope::LastError(); + } + else if (sresult) + { + int processed = 0; + for (std::map<int, Socket *>::const_iterator it = Sockets.begin(), it_end = Sockets.end(); it != it_end && processed != sresult; ++it) + { + Socket *s = it->second; + + if (FD_ISSET(s->GetFD(), &efdset) || FD_ISSET(s->GetFD(), &rfdset) || FD_ISSET(s->GetFD(), &wfdset)) + ++processed; + if (s->HasFlag(SF_DEAD)) + continue; + if (FD_ISSET(s->GetFD(), &efdset)) + { + s->ProcessError(); + s->SetFlag(SF_DEAD); + continue; + } + if (FD_ISSET(s->GetFD(), &rfdset) && !s->ProcessRead()) + s->SetFlag(SF_DEAD); + if (FD_ISSET(s->GetFD(), &wfdset) && !s->ProcessWrite()) + s->SetFlag(SF_DEAD); + } + + for (std::map<int, Socket *>::iterator it = Sockets.begin(), it_end = Sockets.end(); it != it_end; ) + { + Socket *s = it->second; + ++it; + + if (s->HasFlag(SF_DEAD)) + delete s; + } + } +} + |