summaryrefslogtreecommitdiff
path: root/modules/database/db_redis.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'modules/database/db_redis.cpp')
-rw-r--r--modules/database/db_redis.cpp648
1 files changed, 648 insertions, 0 deletions
diff --git a/modules/database/db_redis.cpp b/modules/database/db_redis.cpp
new file mode 100644
index 000000000..03c7dabca
--- /dev/null
+++ b/modules/database/db_redis.cpp
@@ -0,0 +1,648 @@
+/*
+ *
+ * (C) 2003-2013 Anope Team
+ * Contact us at team@anope.org
+ *
+ * Please read COPYING and README for further details.
+ *
+ *
+ */
+
+#include "module.h"
+#include "modules/redis.h"
+
+using namespace Redis;
+
+class DatabaseRedis;
+static DatabaseRedis *me;
+
+class Data : public Serialize::Data
+{
+ public:
+ std::map<Anope::string, std::stringstream *> data;
+
+ ~Data()
+ {
+ for (std::map<Anope::string, std::stringstream *>::iterator it = data.begin(), it_end = data.end(); it != it_end; ++it)
+ delete it->second;
+ }
+
+ std::iostream& operator[](const Anope::string &key) anope_override
+ {
+ std::stringstream* &stream = data[key];
+ if (!stream)
+ stream = new std::stringstream();
+ return *stream;
+ }
+
+ std::set<Anope::string> KeySet() const anope_override
+ {
+ std::set<Anope::string> keys;
+ for (std::map<Anope::string, std::stringstream *>::const_iterator it = this->data.begin(), it_end = this->data.end(); it != it_end; ++it)
+ keys.insert(it->first);
+ return keys;
+ }
+
+ size_t Hash() const anope_override
+ {
+ size_t hash = 0;
+ for (std::map<Anope::string, std::stringstream *>::const_iterator it = this->data.begin(), it_end = this->data.end(); it != it_end; ++it)
+ if (!it->second->str().empty())
+ hash ^= Anope::hash_cs()(it->second->str());
+ return hash;
+ }
+};
+
+class TypeLoader : public Interface
+{
+ Anope::string type;
+ public:
+ TypeLoader(Module *creator, const Anope::string &t) : Interface(creator), type(t) { }
+
+ void OnResult(const Reply &r) anope_override;
+};
+
+class ObjectLoader : public Interface
+{
+ Anope::string type;
+ int64_t id;
+
+ public:
+ ObjectLoader(Module *creator, const Anope::string &t, int64_t i) : Interface(creator), type(t), id(i) { }
+
+ void OnResult(const Reply &r) anope_override;
+};
+
+class IDInterface : public Interface
+{
+ Reference<Serializable> o;
+ public:
+ IDInterface(Module *creator, Serializable *obj) : Interface(creator), o(obj) { }
+
+ void OnResult(const Reply &r) anope_override;
+};
+
+class Deleter : public Interface
+{
+ Anope::string type;
+ int64_t id;
+ public:
+ Deleter(Module *creator, const Anope::string &t, int64_t i) : Interface(creator), type(t), id(i) { }
+
+ void OnResult(const Reply &r) anope_override;
+};
+
+class Updater : public Interface
+{
+ Anope::string type;
+ int64_t id;
+ public:
+ Updater(Module *creator, const Anope::string &t, int64_t i) : Interface(creator), type(t), id(i) { }
+
+ void OnResult(const Reply &r) anope_override;
+};
+
+class ModifiedObject : public Interface
+{
+ Anope::string type;
+ int64_t id;
+ public:
+ ModifiedObject(Module *creator, const Anope::string &t, int64_t i) : Interface(creator), type(t), id(i) { }
+
+ void OnResult(const Reply &r) anope_override;
+};
+
+class SubscriptionListener : public Interface
+{
+ public:
+ SubscriptionListener(Module *creator) : Interface(creator) { }
+
+ void OnResult(const Reply &r) anope_override;
+};
+
+class DatabaseRedis : public Module, public Pipe
+{
+ SubscriptionListener sl;
+ std::set<Serializable *> updated_items;
+
+ public:
+ ServiceReference<Provider> redis;
+
+ DatabaseRedis(const Anope::string &modname, const Anope::string &creator) : Module(modname, creator, DATABASE | VENDOR), sl(this)
+ {
+ me = this;
+
+ Implementation i[] = { I_OnReload, I_OnLoadDatabase, I_OnSerializeTypeCreate, I_OnSerializableConstruct, I_OnSerializableDestruct, I_OnSerializableUpdate };
+ ModuleManager::Attach(i, this, sizeof(i) / sizeof(Implementation));
+ }
+
+ /* Insert or update an object */
+ void InsertObject(Serializable *obj)
+ {
+ Serialize::Type *type = obj->GetSerializableType();
+
+ /* If there is no id yet for ths object, get one */
+ if (!obj->id)
+ redis->SendCommand(new IDInterface(this, obj), "INCR id:" + type->GetName());
+ else
+ {
+ Data data;
+ obj->Serialize(data);
+
+ if (obj->IsCached(data))
+ return;
+
+ obj->UpdateCache(data);
+
+ std::vector<Anope::string> args;
+ args.push_back("HGETALL");
+ args.push_back("hash:" + type->GetName() + ":" + stringify(obj->id));
+
+ /* Get object attrs to clear before updating */
+ redis->SendCommand(new Updater(this, type->GetName(), obj->id), args);
+ }
+ }
+
+ void OnNotify() anope_override
+ {
+ for (std::set<Serializable *>::iterator it = this->updated_items.begin(), it_end = this->updated_items.end(); it != it_end; ++it)
+ {
+ Serializable *s = *it;
+
+ this->InsertObject(s);
+ }
+
+ this->updated_items.clear();
+ }
+
+ void OnReload(Configuration::Conf *conf) anope_override
+ {
+ Configuration::Block *block = conf->GetModule(this);
+ this->redis = ServiceReference<Provider>("Redis::Provider", block->Get<const Anope::string>("engine", "redis/main"));
+ }
+
+ EventReturn OnLoadDatabase() anope_override
+ {
+ const std::vector<Anope::string> type_order = Serialize::Type::GetTypeOrder();
+ for (unsigned i = 0; i < type_order.size(); ++i)
+ {
+ Serialize::Type *sb = Serialize::Type::Find(type_order[i]);
+ this->OnSerializeTypeCreate(sb);
+ }
+
+ while (redis->BlockAndProcess());
+
+ redis->Subscribe(&this->sl, "__keyspace@*__:hash:*");
+
+ return EVENT_STOP;
+ }
+
+ void OnSerializeTypeCreate(Serialize::Type *sb) anope_override
+ {
+ if (!redis)
+ return;
+
+ std::vector<Anope::string> args;
+ args.push_back("SMEMBERS");
+ args.push_back("ids:" + sb->GetName());
+
+ redis->SendCommand(new TypeLoader(this, sb->GetName()), args);
+ }
+
+ void OnSerializableConstruct(Serializable *obj) anope_override
+ {
+ this->updated_items.insert(obj);
+ this->Notify();
+ }
+
+ void OnSerializableDestruct(Serializable *obj) anope_override
+ {
+ Serialize::Type *type = obj->GetSerializableType();
+
+ std::vector<Anope::string> args;
+ args.push_back("HGETALL");
+ args.push_back("hash:" + type->GetName() + ":" + stringify(obj->id));
+
+ /* Get all of the attributes for this object */
+ redis->SendCommand(new Deleter(this, type->GetName(), obj->id), args);
+
+ this->updated_items.erase(obj);
+ type->objects.erase(obj->id);
+ this->Notify();
+ }
+
+ void OnSerializableUpdate(Serializable *obj) anope_override
+ {
+ this->updated_items.insert(obj);
+ this->Notify();
+ }
+};
+
+void TypeLoader::OnResult(const Reply &r)
+{
+ if (r.type != Reply::MULTI_BULK || !me->redis)
+ {
+ delete this;
+ return;
+ }
+
+ for (unsigned i = 0; i < r.multi_bulk.size(); ++i)
+ {
+ const Reply &reply = r.multi_bulk[i];
+
+ if (reply.type != Reply::BULK)
+ continue;
+
+ int64_t i;
+ try
+ {
+ i = convertTo<int64_t>(reply.bulk);
+ }
+ catch (const ConvertException &)
+ {
+ continue;
+ }
+
+ std::vector<Anope::string> args;
+ args.push_back("HGETALL");
+ args.push_back("hash:" + this->type + ":" + stringify(i));
+
+ me->redis->SendCommand(new ObjectLoader(me, this->type, i), args);
+ }
+
+ delete this;
+}
+
+void ObjectLoader::OnResult(const Reply &r)
+{
+ Serialize::Type *st = Serialize::Type::Find(this->type);
+
+ if (r.type != Reply::MULTI_BULK || r.multi_bulk.empty() || !me->redis || !st)
+ {
+ delete this;
+ return;
+ }
+
+ Data data;
+
+ for (unsigned i = 0; i + 1 < r.multi_bulk.size(); i += 2)
+ {
+ const Reply &key = r.multi_bulk[i],
+ &value = r.multi_bulk[i + 1];
+
+ data[key.bulk] << value.bulk;
+ }
+
+ Serializable* &obj = st->objects[this->id];
+ obj = st->Unserialize(obj, data);
+ if (obj)
+ {
+ obj->id = this->id;
+ obj->UpdateCache(data);
+ }
+
+ delete this;
+}
+
+void IDInterface::OnResult(const Reply &r)
+{
+ if (!o || r.type != Reply::INT || !r.i)
+ {
+ delete this;
+ return;
+ }
+
+ Serializable* &obj = o->GetSerializableType()->objects[r.i];
+ if (obj)
+ /* This shouldn't be possible */
+ obj->id = 0;
+
+ o->id = r.i;
+ obj = o;
+
+ /* Now that we have the id, insert this object for real */
+ anope_dynamic_static_cast<DatabaseRedis *>(this->owner)->InsertObject(o);
+
+ delete this;
+}
+
+void Deleter::OnResult(const Reply &r)
+{
+ if (r.type != Reply::MULTI_BULK || !me->redis || r.multi_bulk.empty())
+ {
+ delete this;
+ return;
+ }
+
+ /* Transaction start */
+ me->redis->StartTransaction();
+
+ std::vector<Anope::string> args;
+ args.push_back("DEL");
+ args.push_back("hash:" + this->type + ":" + stringify(this->id));
+
+ /* Delete hash object */
+ me->redis->SendCommand(NULL, args);
+
+ args.clear();
+ args.push_back("SREM");
+ args.push_back("ids:" + this->type);
+ args.push_back(stringify(this->id));
+
+ /* Delete id from ids set */
+ me->redis->SendCommand(NULL, args);
+
+ for (unsigned i = 0; i + 1 < r.multi_bulk.size(); i += 2)
+ {
+ const Reply &key = r.multi_bulk[i],
+ &value = r.multi_bulk[i + 1];
+
+ args.clear();
+ args.push_back("SREM");
+ args.push_back("value:" + this->type + ":" + key.bulk + ":" + value.bulk);
+ args.push_back(stringify(this->id));
+
+ /* Delete value -> object id */
+ me->redis->SendCommand(NULL, args);
+ }
+
+ /* Transaction end */
+ me->redis->CommitTransaction();
+
+ delete this;
+}
+
+void Updater::OnResult(const Reply &r)
+{
+ Serialize::Type *st = Serialize::Type::Find(this->type);
+
+ if (!st)
+ {
+ delete this;
+ return;
+ }
+
+ Serializable *obj = st->objects[this->id];
+ if (!obj)
+ {
+ delete this;
+ return;
+ }
+
+ Data data;
+ obj->Serialize(data);
+
+ /* Transaction start */
+ me->redis->StartTransaction();
+
+ for (unsigned i = 0; i + 1 < r.multi_bulk.size(); i += 2)
+ {
+ const Reply &key = r.multi_bulk[i],
+ &value = r.multi_bulk[i + 1];
+
+ std::vector<Anope::string> args;
+ args.push_back("SREM");
+ args.push_back("value:" + this->type + ":" + key.bulk + ":" + value.bulk);
+ args.push_back(stringify(this->id));
+
+ /* Delete value -> object id */
+ me->redis->SendCommand(NULL, args);
+ }
+
+ /* Add object id to id set for this type */
+ std::vector<Anope::string> args;
+ args.push_back("SADD");
+ args.push_back("ids:" + this->type);
+ args.push_back(stringify(obj->id));
+ me->redis->SendCommand(NULL, args);
+
+ args.clear();
+ args.push_back("HMSET");
+ args.push_back("hash:" + this->type + ":" + stringify(obj->id));
+
+ typedef std::map<Anope::string, std::stringstream *> items;
+ for (items::iterator it = data.data.begin(), it_end = data.data.end(); it != it_end; ++it)
+ {
+ const Anope::string &key = it->first;
+ std::stringstream *value = it->second;
+
+ args.push_back(key);
+ args.push_back(value->str());
+
+ std::vector<Anope::string> args2;
+
+ args2.push_back("SADD");
+ args2.push_back("value:" + this->type + ":" + key + ":" + value->str());
+ args2.push_back(stringify(obj->id));
+
+ /* Add to value -> object id set */
+ me->redis->SendCommand(NULL, args2);
+ }
+
+ ++obj->redis_ignore;
+
+ /* Add object */
+ me->redis->SendCommand(NULL, args);
+
+ /* Transaction end */
+ me->redis->CommitTransaction();
+
+ delete this;
+}
+
+void SubscriptionListener::OnResult(const Reply &r)
+{
+ /*
+ * [May 15 13:59:35.645839 2013] Debug: pmessage
+ * [May 15 13:59:35.645866 2013] Debug: __keyspace@*__:anope:hash:*
+ * [May 15 13:59:35.645880 2013] Debug: __keyspace@0__:anope:hash:type:id
+ * [May 15 13:59:35.645893 2013] Debug: hset
+ */
+ if (r.multi_bulk.size() != 4)
+ return;
+
+ size_t sz = r.multi_bulk[2].bulk.find(':');
+ if (sz == Anope::string::npos)
+ return;
+
+ const Anope::string &key = r.multi_bulk[2].bulk.substr(sz + 1),
+ &op = r.multi_bulk[3].bulk;
+
+ sz = key.rfind(':');
+ if (sz == Anope::string::npos)
+ return;
+
+ const Anope::string &id = key.substr(sz + 1);
+
+ size_t sz2 = key.rfind(':', sz - 1);
+ if (sz2 == Anope::string::npos)
+ return;
+ const Anope::string &type = key.substr(sz2 + 1, sz - sz2 - 1);
+
+ Serialize::Type *s_type = Serialize::Type::Find(type);
+
+ if (s_type == NULL)
+ return;
+
+ uint64_t obj_id;
+ try
+ {
+ obj_id = convertTo<uint64_t>(id);
+ }
+ catch (const ConvertException &)
+ {
+ return;
+ }
+
+ if (op == "hset" || op == "hdel")
+ {
+ Serializable *s = s_type->objects[obj_id];
+
+ if (s && s->redis_ignore)
+ {
+ --s->redis_ignore;
+ Log(LOG_DEBUG) << "redis: notify: got modify for object id " << obj_id << " of type " << type << ", but I am ignoring it";
+ }
+ else
+ {
+ Log(LOG_DEBUG) << "redis: notify: got modify for object id " << obj_id << " of type " << type;
+
+ std::vector<Anope::string> args;
+ args.push_back("HGETALL");
+ args.push_back("hash:" + type + ":" + id);
+
+ me->redis->SendCommand(new ModifiedObject(me, type, obj_id), args);
+ }
+ }
+ else if (op == "del")
+ {
+ Serializable* &s = s_type->objects[obj_id];
+ if (s == NULL)
+ return;
+
+ Log(LOG_DEBUG) << "redis: notify: deleting object id " << obj_id << " of type " << type;
+
+ Data data;
+
+ s->Serialize(data);
+
+ /* Transaction start */
+ me->redis->StartTransaction();
+
+ typedef std::map<Anope::string, std::stringstream *> items;
+ for (items::iterator it = data.data.begin(), it_end = data.data.end(); it != it_end; ++it)
+ {
+ const Anope::string &key = it->first;
+ std::stringstream *value = it->second;
+
+ std::vector<Anope::string> args;
+ args.push_back("SREM");
+ args.push_back("value:" + type + ":" + key + ":" + value->str());
+ args.push_back(id);
+
+ /* Delete value -> object id */
+ me->redis->SendCommand(NULL, args);
+ }
+
+ std::vector<Anope::string> args;
+ args.push_back("SREM");
+ args.push_back("ids:" + type);
+ args.push_back(stringify(s->id));
+
+ /* Delete object from id set */
+ me->redis->SendCommand(NULL, args);
+
+ /* Transaction end */
+ me->redis->CommitTransaction();
+
+ delete s;
+ s = NULL;
+ }
+}
+
+void ModifiedObject::OnResult(const Reply &r)
+{
+ Serialize::Type *st = Serialize::Type::Find(this->type);
+
+ if (!st)
+ {
+ delete this;
+ return;
+ }
+
+ Serializable* &obj = st->objects[this->id];
+
+ /* Transaction start */
+ me->redis->StartTransaction();
+
+ /* Erase old object values */
+ if (obj)
+ {
+ Data data;
+
+ obj->Serialize(data);
+
+ typedef std::map<Anope::string, std::stringstream *> items;
+ for (items::iterator it = data.data.begin(), it_end = data.data.end(); it != it_end; ++it)
+ {
+ const Anope::string &key = it->first;
+ std::stringstream *value = it->second;
+
+ std::vector<Anope::string> args;
+ args.push_back("SREM");
+ args.push_back("value:" + st->GetName() + ":" + key + ":" + value->str());
+ args.push_back(stringify(this->id));
+
+ /* Delete value -> object id */
+ me->redis->SendCommand(NULL, args);
+ }
+ }
+
+ Data data;
+
+ for (unsigned i = 0; i + 1 < r.multi_bulk.size(); i += 2)
+ {
+ const Reply &key = r.multi_bulk[i],
+ &value = r.multi_bulk[i + 1];
+
+ data[key.bulk] << value.bulk;
+ }
+
+ obj = st->Unserialize(obj, data);
+ if (obj)
+ {
+ obj->id = this->id;
+ obj->UpdateCache(data);
+
+ /* Insert new object values */
+ typedef std::map<Anope::string, std::stringstream *> items;
+ for (items::iterator it = data.data.begin(), it_end = data.data.end(); it != it_end; ++it)
+ {
+ const Anope::string &key = it->first;
+ std::stringstream *value = it->second;
+
+ std::vector<Anope::string> args;
+ args.push_back("SADD");
+ args.push_back("value:" + st->GetName() + ":" + key + ":" + value->str());
+ args.push_back(stringify(obj->id));
+
+ /* Add to value -> object id set */
+ me->redis->SendCommand(NULL, args);
+ }
+
+ std::vector<Anope::string> args;
+ args.push_back("SADD");
+ args.push_back("ids:" + st->GetName());
+ args.push_back(stringify(obj->id));
+
+ /* Add to type -> id set */
+ me->redis->SendCommand(NULL, args);
+ }
+
+ /* Transaction end */
+ me->redis->CommitTransaction();
+
+ delete this;
+}
+
+MODULE_INIT(DatabaseRedis)