diff options
Diffstat (limited to 'modules/m_redis.cpp')
-rw-r--r-- | modules/m_redis.cpp | 618 |
1 files changed, 0 insertions, 618 deletions
diff --git a/modules/m_redis.cpp b/modules/m_redis.cpp deleted file mode 100644 index 7945ab9cc..000000000 --- a/modules/m_redis.cpp +++ /dev/null @@ -1,618 +0,0 @@ -/* - * - * (C) 2003-2017 Anope Team - * Contact us at team@anope.org - * - * Please read COPYING and README for further details. - */ - -#include "module.h" -#include "modules/redis.h" - -using namespace Redis; - -class MyRedisService; - -class RedisSocket : public BinarySocket, public ConnectionSocket -{ - size_t ParseReply(Reply &r, const char *buf, size_t l); - public: - MyRedisService *provider; - std::deque<Interface *> interfaces; - std::map<Anope::string, Interface *> subinterfaces; - - RedisSocket(MyRedisService *pro, bool v6) : Socket(-1, v6), provider(pro) { } - - ~RedisSocket(); - - void OnConnect() anope_override; - void OnError(const Anope::string &error) anope_override; - - bool Read(const char *buffer, size_t l) anope_override; -}; - -class Transaction : public Interface -{ - public: - std::deque<Interface *> interfaces; - - Transaction(Module *creator) : Interface(creator) { } - - ~Transaction() - { - for (unsigned i = 0; i < interfaces.size(); ++i) - { - Interface *inter = interfaces[i]; - - if (!inter) - continue; - - inter->OnError("Interface going away"); - } - } - - void OnResult(const Reply &r) anope_override - { - /* This is a multi bulk reply of the results of the queued commands - * in this transaction - */ - - Log(LOG_DEBUG_2) << "redis: transaction complete with " << r.multi_bulk.size() << " results"; - - for (unsigned i = 0; i < r.multi_bulk.size(); ++i) - { - const Reply *reply = r.multi_bulk[i]; - - if (interfaces.empty()) - break; - - Interface *inter = interfaces.front(); - interfaces.pop_front(); - - if (inter) - inter->OnResult(*reply); - } - } -}; - -class MyRedisService : public Provider -{ - public: - Anope::string host; - int port; - unsigned db; - - RedisSocket *sock, *sub; - - Transaction ti; - bool in_transaction; - - MyRedisService(Module *c, const Anope::string &n, const Anope::string &h, int p, unsigned d) : Provider(c, n), host(h), port(p), db(d), sock(NULL), sub(NULL), - ti(c), in_transaction(false) - { - sock = new RedisSocket(this, host.find(':') != Anope::string::npos); - sock->Connect(host, port); - - sub = new RedisSocket(this, host.find(':') != Anope::string::npos); - sub->Connect(host, port); - } - - ~MyRedisService() - { - if (sock) - { - sock->flags[SF_DEAD] = true; - sock->provider = NULL; - } - - if (sub) - { - sub->flags[SF_DEAD] = true; - sub->provider = NULL; - } - } - - private: - inline void Pack(std::vector<char> &buffer, const char *buf, size_t sz = 0) - { - if (!sz) - sz = strlen(buf); - - size_t old_size = buffer.size(); - buffer.resize(old_size + sz); - std::copy(buf, buf + sz, buffer.begin() + old_size); - } - - void Send(RedisSocket *s, Interface *i, const std::vector<std::pair<const char *, size_t> > &args) - { - std::vector<char> buffer; - - Pack(buffer, "*"); - Pack(buffer, stringify(args.size()).c_str()); - Pack(buffer, "\r\n"); - - for (unsigned j = 0; j < args.size(); ++j) - { - const std::pair<const char *, size_t> &pair = args[j]; - - Pack(buffer, "$"); - Pack(buffer, stringify(pair.second).c_str()); - Pack(buffer, "\r\n"); - - Pack(buffer, pair.first, pair.second); - Pack(buffer, "\r\n"); - } - - if (buffer.empty()) - return; - - s->Write(&buffer[0], buffer.size()); - if (in_transaction) - { - ti.interfaces.push_back(i); - s->interfaces.push_back(NULL); // For the +Queued response - } - else - s->interfaces.push_back(i); - } - - public: - void SendCommand(RedisSocket *s, Interface *i, const std::vector<Anope::string> &cmds) - { - std::vector<std::pair<const char *, size_t> > args; - for (unsigned j = 0; j < cmds.size(); ++j) - args.push_back(std::make_pair(cmds[j].c_str(), cmds[j].length())); - this->Send(s, i, args); - } - - void SendCommand(RedisSocket *s, Interface *i, const Anope::string &str) - { - std::vector<Anope::string> args; - spacesepstream(str).GetTokens(args); - this->SendCommand(s, i, args); - } - - void Send(Interface *i, const std::vector<std::pair<const char *, size_t> > &args) - { - if (!sock) - { - sock = new RedisSocket(this, host.find(':') != Anope::string::npos); - sock->Connect(host, port); - } - - this->Send(sock, i, args); - } - - void SendCommand(Interface *i, const std::vector<Anope::string> &cmds) anope_override - { - std::vector<std::pair<const char *, size_t> > args; - for (unsigned j = 0; j < cmds.size(); ++j) - args.push_back(std::make_pair(cmds[j].c_str(), cmds[j].length())); - this->Send(i, args); - } - - void SendCommand(Interface *i, const Anope::string &str) anope_override - { - std::vector<Anope::string> args; - spacesepstream(str).GetTokens(args); - this->SendCommand(i, args); - } - - public: - bool BlockAndProcess() anope_override - { - this->sock->ProcessWrite(); - this->sock->SetBlocking(true); - this->sock->ProcessRead(); - this->sock->SetBlocking(false); - return !this->sock->interfaces.empty(); - } - - void Subscribe(Interface *i, const Anope::string &pattern) anope_override - { - if (sub == NULL) - { - sub = new RedisSocket(this, host.find(':') != Anope::string::npos); - sub->Connect(host, port); - } - - std::vector<Anope::string> args; - args.push_back("PSUBSCRIBE"); - args.push_back(pattern); - this->SendCommand(sub, NULL, args); - - sub->subinterfaces[pattern] = i; - } - - void Unsubscribe(const Anope::string &pattern) anope_override - { - if (sub) - sub->subinterfaces.erase(pattern); - } - - void StartTransaction() anope_override - { - if (in_transaction) - throw CoreException(); - - this->SendCommand(NULL, "MULTI"); - in_transaction = true; - } - - void CommitTransaction() anope_override - { - /* The result of the transaction comes back to the reply of EXEC as a multi bulk. - * The reply to the individual commands that make up the transaction when executed - * is a simple +QUEUED - */ - in_transaction = false; - this->SendCommand(&this->ti, "EXEC"); - } -}; - -RedisSocket::~RedisSocket() -{ - if (provider) - { - if (provider->sock == this) - provider->sock = NULL; - else if (provider->sub == this) - provider->sub = NULL; - } - - for (unsigned i = 0; i < interfaces.size(); ++i) - { - Interface *inter = interfaces[i]; - - if (!inter) - continue; - - inter->OnError("Interface going away"); - } -} - -void RedisSocket::OnConnect() -{ - Log() << "redis: Successfully connected to " << provider->name << (this == this->provider->sub ? " (sub)" : ""); - - this->provider->SendCommand(NULL, "CLIENT SETNAME Anope"); - this->provider->SendCommand(NULL, "SELECT " + stringify(provider->db)); - - if (this != this->provider->sub) - { - this->provider->SendCommand(this, NULL, "CONFIG SET notify-keyspace-events KA"); - } -} - -void RedisSocket::OnError(const Anope::string &error) -{ - Log() << "redis: Error on " << provider->name << (this == this->provider->sub ? " (sub)" : "") << ": " << error; -} - -size_t RedisSocket::ParseReply(Reply &r, const char *buffer, size_t l) -{ - size_t used = 0; - - if (!l) - return used; - - if (r.type == Reply::MULTI_BULK) - goto multi_bulk_cont; - - switch (*buffer) - { - case '+': - { - Anope::string reason(buffer, 1, l - 1); - size_t nl = reason.find("\r\n"); - Log(LOG_DEBUG_2) << "redis: status ok: " << reason.substr(0, nl); - if (nl != Anope::string::npos) - { - r.type = Reply::OK; - used = 1 + nl + 2; - } - break; - } - case '-': - { - Anope::string reason(buffer, 1, l - 1); - size_t nl = reason.find("\r\n"); - Log(LOG_DEBUG) << "redis: status error: " << reason.substr(0, nl); - if (nl != Anope::string::npos) - { - r.type = Reply::NOT_OK; - used = 1 + nl + 2; - } - break; - } - case ':': - { - Anope::string ibuf(buffer, 1, l - 1); - size_t nl = ibuf.find("\r\n"); - if (nl != Anope::string::npos) - { - try - { - r.i = convertTo<int64_t>(ibuf.substr(0, nl)); - } - catch (const ConvertException &) { } - - r.type = Reply::INT; - used = 1 + nl + 2; - } - break; - } - case '$': - { - Anope::string reply(buffer + 1, l - 1); - /* This assumes one bulk can always fit in our recv buffer */ - size_t nl = reply.find("\r\n"); - if (nl != Anope::string::npos) - { - int len; - try - { - len = convertTo<int>(reply.substr(0, nl)); - if (len >= 0) - { - if (1 + nl + 2 + len + 2 <= l) - { - used = 1 + nl + 2 + len + 2; - r.bulk = reply.substr(nl + 2, len); - r.type = Reply::BULK; - } - } - else - { - used = 1 + nl + 2 + 2; - r.type = Reply::BULK; - } - } - catch (const ConvertException &) { } - } - break; - } - multi_bulk_cont: - case '*': - { - if (r.type != Reply::MULTI_BULK) - { - Anope::string reply(buffer + 1, l - 1); - size_t nl = reply.find("\r\n"); - if (nl != Anope::string::npos) - { - r.type = Reply::MULTI_BULK; - try - { - r.multi_bulk_size = convertTo<int>(reply.substr(0, nl)); - } - catch (const ConvertException &) { } - - used = 1 + nl + 2; - } - else - break; - } - else if (r.multi_bulk_size >= 0 && r.multi_bulk.size() == static_cast<unsigned>(r.multi_bulk_size)) - { - /* This multi bulk is already complete, so check the sub bulks */ - for (unsigned i = 0; i < r.multi_bulk.size(); ++i) - if (r.multi_bulk[i]->type == Reply::MULTI_BULK) - ParseReply(*r.multi_bulk[i], buffer + used, l - used); - break; - } - - for (int i = r.multi_bulk.size(); i < r.multi_bulk_size; ++i) - { - Reply *reply = new Reply(); - size_t u = ParseReply(*reply, buffer + used, l - used); - if (!u) - { - Log(LOG_DEBUG) << "redis: ran out of data to parse"; - delete reply; - break; - } - r.multi_bulk.push_back(reply); - used += u; - } - break; - } - default: - Log(LOG_DEBUG) << "redis: unknown reply " << *buffer; - } - - return used; -} - -bool RedisSocket::Read(const char *buffer, size_t l) -{ - static std::vector<char> save; - std::vector<char> copy; - - if (!save.empty()) - { - std::copy(buffer, buffer + l, std::back_inserter(save)); - - copy = save; - - buffer = ©[0]; - l = copy.size(); - } - - while (l) - { - static Reply r; - - size_t used = this->ParseReply(r, buffer, l); - if (!used) - { - Log(LOG_DEBUG) << "redis: used == 0 ?"; - r.Clear(); - break; - } - else if (used > l) - { - Log(LOG_DEBUG) << "redis: used > l ?"; - r.Clear(); - break; - } - - /* Full result is not here yet */ - if (r.type == Reply::MULTI_BULK && static_cast<unsigned>(r.multi_bulk_size) != r.multi_bulk.size()) - { - buffer += used; - l -= used; - break; - } - - if (this == provider->sub) - { - if (r.multi_bulk.size() == 4) - { - /* pmessage - * pattern subscribed to - * __keyevent@0__:set - * key - */ - std::map<Anope::string, Interface *>::iterator it = this->subinterfaces.find(r.multi_bulk[1]->bulk); - if (it != this->subinterfaces.end()) - it->second->OnResult(r); - } - } - else - { - if (this->interfaces.empty()) - { - Log(LOG_DEBUG) << "redis: no interfaces?"; - } - else - { - Interface *i = this->interfaces.front(); - this->interfaces.pop_front(); - - if (i) - { - if (r.type != Reply::NOT_OK) - i->OnResult(r); - else - i->OnError(r.bulk); - } - } - } - - buffer += used; - l -= used; - - r.Clear(); - } - - if (l) - { - save.resize(l); - std::copy(buffer, buffer + l, save.begin()); - } - else - std::vector<char>().swap(save); - - return true; -} - - -class ModuleRedis : public Module -{ - std::map<Anope::string, MyRedisService *> services; - - public: - ModuleRedis(const Anope::string &modname, const Anope::string &creator) : Module(modname, creator, EXTRA | VENDOR) - { - } - - ~ModuleRedis() - { - for (std::map<Anope::string, MyRedisService *>::iterator it = services.begin(); it != services.end(); ++it) - { - MyRedisService *p = it->second; - - delete p->sock; - p->sock = NULL; - delete p->sub; - p->sub = NULL; - - delete p; - } - } - - void OnReload(Configuration::Conf *conf) anope_override - { - Configuration::Block *block = conf->GetModule(this); - std::vector<Anope::string> new_services; - - for (int i = 0; i < block->CountBlock("redis"); ++i) - { - Configuration::Block *redis = block->GetBlock("redis", i); - - const Anope::string &n = redis->Get<const Anope::string>("name"), - &ip = redis->Get<const Anope::string>("ip"); - int port = redis->Get<int>("port"); - unsigned db = redis->Get<unsigned>("db"); - - delete services[n]; - services[n] = new MyRedisService(this, n, ip, port, db); - new_services.push_back(n); - } - - for (std::map<Anope::string, MyRedisService *>::iterator it = services.begin(); it != services.end();) - { - Provider *p = it->second; - ++it; - - if (std::find(new_services.begin(), new_services.end(), p->name) == new_services.end()) - delete it->second; - } - } - - void OnModuleUnload(User *, Module *m) anope_override - { - for (std::map<Anope::string, MyRedisService *>::iterator it = services.begin(); it != services.end(); ++it) - { - MyRedisService *p = it->second; - - if (p->sock) - for (unsigned i = p->sock->interfaces.size(); i > 0; --i) - { - Interface *inter = p->sock->interfaces[i - 1]; - - if (inter && inter->owner == m) - { - inter->OnError(m->name + " being unloaded"); - p->sock->interfaces.erase(p->sock->interfaces.begin() + i - 1); - } - } - - if (p->sub) - for (unsigned i = p->sub->interfaces.size(); i > 0; --i) - { - Interface *inter = p->sub->interfaces[i - 1]; - - if (inter && inter->owner == m) - { - inter->OnError(m->name + " being unloaded"); - p->sub->interfaces.erase(p->sub->interfaces.begin() + i - 1); - } - } - - for (unsigned i = p->ti.interfaces.size(); i > 0; --i) - { - Interface *inter = p->ti.interfaces[i - 1]; - - if (inter && inter->owner == m) - { - inter->OnError(m->name + " being unloaded"); - p->ti.interfaces.erase(p->ti.interfaces.begin() + i - 1); - } - } - } - } -}; - -MODULE_INIT(ModuleRedis) |