From 2428264315868f0860f9747c8b005536e5442db6 Mon Sep 17 00:00:00 2001 From: Adam Date: Fri, 17 May 2013 23:04:18 -0400 Subject: Add Redis database support --- modules/database/db_redis.cpp | 648 ++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 648 insertions(+) create mode 100644 modules/database/db_redis.cpp (limited to 'modules/database/db_redis.cpp') diff --git a/modules/database/db_redis.cpp b/modules/database/db_redis.cpp new file mode 100644 index 000000000..03c7dabca --- /dev/null +++ b/modules/database/db_redis.cpp @@ -0,0 +1,648 @@ +/* + * + * (C) 2003-2013 Anope Team + * Contact us at team@anope.org + * + * Please read COPYING and README for further details. + * + * + */ + +#include "module.h" +#include "modules/redis.h" + +using namespace Redis; + +class DatabaseRedis; +static DatabaseRedis *me; + +class Data : public Serialize::Data +{ + public: + std::map data; + + ~Data() + { + for (std::map::iterator it = data.begin(), it_end = data.end(); it != it_end; ++it) + delete it->second; + } + + std::iostream& operator[](const Anope::string &key) anope_override + { + std::stringstream* &stream = data[key]; + if (!stream) + stream = new std::stringstream(); + return *stream; + } + + std::set KeySet() const anope_override + { + std::set keys; + for (std::map::const_iterator it = this->data.begin(), it_end = this->data.end(); it != it_end; ++it) + keys.insert(it->first); + return keys; + } + + size_t Hash() const anope_override + { + size_t hash = 0; + for (std::map::const_iterator it = this->data.begin(), it_end = this->data.end(); it != it_end; ++it) + if (!it->second->str().empty()) + hash ^= Anope::hash_cs()(it->second->str()); + return hash; + } +}; + +class TypeLoader : public Interface +{ + Anope::string type; + public: + TypeLoader(Module *creator, const Anope::string &t) : Interface(creator), type(t) { } + + void OnResult(const Reply &r) anope_override; +}; + +class ObjectLoader : public Interface +{ + Anope::string type; + int64_t id; + + public: + ObjectLoader(Module *creator, const Anope::string &t, int64_t i) : Interface(creator), type(t), id(i) { } + + void OnResult(const Reply &r) anope_override; +}; + +class IDInterface : public Interface +{ + Reference o; + public: + IDInterface(Module *creator, Serializable *obj) : Interface(creator), o(obj) { } + + void OnResult(const Reply &r) anope_override; +}; + +class Deleter : public Interface +{ + Anope::string type; + int64_t id; + public: + Deleter(Module *creator, const Anope::string &t, int64_t i) : Interface(creator), type(t), id(i) { } + + void OnResult(const Reply &r) anope_override; +}; + +class Updater : public Interface +{ + Anope::string type; + int64_t id; + public: + Updater(Module *creator, const Anope::string &t, int64_t i) : Interface(creator), type(t), id(i) { } + + void OnResult(const Reply &r) anope_override; +}; + +class ModifiedObject : public Interface +{ + Anope::string type; + int64_t id; + public: + ModifiedObject(Module *creator, const Anope::string &t, int64_t i) : Interface(creator), type(t), id(i) { } + + void OnResult(const Reply &r) anope_override; +}; + +class SubscriptionListener : public Interface +{ + public: + SubscriptionListener(Module *creator) : Interface(creator) { } + + void OnResult(const Reply &r) anope_override; +}; + +class DatabaseRedis : public Module, public Pipe +{ + SubscriptionListener sl; + std::set updated_items; + + public: + ServiceReference redis; + + DatabaseRedis(const Anope::string &modname, const Anope::string &creator) : Module(modname, creator, DATABASE | VENDOR), sl(this) + { + me = this; + + Implementation i[] = { I_OnReload, I_OnLoadDatabase, I_OnSerializeTypeCreate, I_OnSerializableConstruct, I_OnSerializableDestruct, I_OnSerializableUpdate }; + ModuleManager::Attach(i, this, sizeof(i) / sizeof(Implementation)); + } + + /* Insert or update an object */ + void InsertObject(Serializable *obj) + { + Serialize::Type *type = obj->GetSerializableType(); + + /* If there is no id yet for ths object, get one */ + if (!obj->id) + redis->SendCommand(new IDInterface(this, obj), "INCR id:" + type->GetName()); + else + { + Data data; + obj->Serialize(data); + + if (obj->IsCached(data)) + return; + + obj->UpdateCache(data); + + std::vector args; + args.push_back("HGETALL"); + args.push_back("hash:" + type->GetName() + ":" + stringify(obj->id)); + + /* Get object attrs to clear before updating */ + redis->SendCommand(new Updater(this, type->GetName(), obj->id), args); + } + } + + void OnNotify() anope_override + { + for (std::set::iterator it = this->updated_items.begin(), it_end = this->updated_items.end(); it != it_end; ++it) + { + Serializable *s = *it; + + this->InsertObject(s); + } + + this->updated_items.clear(); + } + + void OnReload(Configuration::Conf *conf) anope_override + { + Configuration::Block *block = conf->GetModule(this); + this->redis = ServiceReference("Redis::Provider", block->Get("engine", "redis/main")); + } + + EventReturn OnLoadDatabase() anope_override + { + const std::vector type_order = Serialize::Type::GetTypeOrder(); + for (unsigned i = 0; i < type_order.size(); ++i) + { + Serialize::Type *sb = Serialize::Type::Find(type_order[i]); + this->OnSerializeTypeCreate(sb); + } + + while (redis->BlockAndProcess()); + + redis->Subscribe(&this->sl, "__keyspace@*__:hash:*"); + + return EVENT_STOP; + } + + void OnSerializeTypeCreate(Serialize::Type *sb) anope_override + { + if (!redis) + return; + + std::vector args; + args.push_back("SMEMBERS"); + args.push_back("ids:" + sb->GetName()); + + redis->SendCommand(new TypeLoader(this, sb->GetName()), args); + } + + void OnSerializableConstruct(Serializable *obj) anope_override + { + this->updated_items.insert(obj); + this->Notify(); + } + + void OnSerializableDestruct(Serializable *obj) anope_override + { + Serialize::Type *type = obj->GetSerializableType(); + + std::vector args; + args.push_back("HGETALL"); + args.push_back("hash:" + type->GetName() + ":" + stringify(obj->id)); + + /* Get all of the attributes for this object */ + redis->SendCommand(new Deleter(this, type->GetName(), obj->id), args); + + this->updated_items.erase(obj); + type->objects.erase(obj->id); + this->Notify(); + } + + void OnSerializableUpdate(Serializable *obj) anope_override + { + this->updated_items.insert(obj); + this->Notify(); + } +}; + +void TypeLoader::OnResult(const Reply &r) +{ + if (r.type != Reply::MULTI_BULK || !me->redis) + { + delete this; + return; + } + + for (unsigned i = 0; i < r.multi_bulk.size(); ++i) + { + const Reply &reply = r.multi_bulk[i]; + + if (reply.type != Reply::BULK) + continue; + + int64_t i; + try + { + i = convertTo(reply.bulk); + } + catch (const ConvertException &) + { + continue; + } + + std::vector args; + args.push_back("HGETALL"); + args.push_back("hash:" + this->type + ":" + stringify(i)); + + me->redis->SendCommand(new ObjectLoader(me, this->type, i), args); + } + + delete this; +} + +void ObjectLoader::OnResult(const Reply &r) +{ + Serialize::Type *st = Serialize::Type::Find(this->type); + + if (r.type != Reply::MULTI_BULK || r.multi_bulk.empty() || !me->redis || !st) + { + delete this; + return; + } + + Data data; + + for (unsigned i = 0; i + 1 < r.multi_bulk.size(); i += 2) + { + const Reply &key = r.multi_bulk[i], + &value = r.multi_bulk[i + 1]; + + data[key.bulk] << value.bulk; + } + + Serializable* &obj = st->objects[this->id]; + obj = st->Unserialize(obj, data); + if (obj) + { + obj->id = this->id; + obj->UpdateCache(data); + } + + delete this; +} + +void IDInterface::OnResult(const Reply &r) +{ + if (!o || r.type != Reply::INT || !r.i) + { + delete this; + return; + } + + Serializable* &obj = o->GetSerializableType()->objects[r.i]; + if (obj) + /* This shouldn't be possible */ + obj->id = 0; + + o->id = r.i; + obj = o; + + /* Now that we have the id, insert this object for real */ + anope_dynamic_static_cast(this->owner)->InsertObject(o); + + delete this; +} + +void Deleter::OnResult(const Reply &r) +{ + if (r.type != Reply::MULTI_BULK || !me->redis || r.multi_bulk.empty()) + { + delete this; + return; + } + + /* Transaction start */ + me->redis->StartTransaction(); + + std::vector args; + args.push_back("DEL"); + args.push_back("hash:" + this->type + ":" + stringify(this->id)); + + /* Delete hash object */ + me->redis->SendCommand(NULL, args); + + args.clear(); + args.push_back("SREM"); + args.push_back("ids:" + this->type); + args.push_back(stringify(this->id)); + + /* Delete id from ids set */ + me->redis->SendCommand(NULL, args); + + for (unsigned i = 0; i + 1 < r.multi_bulk.size(); i += 2) + { + const Reply &key = r.multi_bulk[i], + &value = r.multi_bulk[i + 1]; + + args.clear(); + args.push_back("SREM"); + args.push_back("value:" + this->type + ":" + key.bulk + ":" + value.bulk); + args.push_back(stringify(this->id)); + + /* Delete value -> object id */ + me->redis->SendCommand(NULL, args); + } + + /* Transaction end */ + me->redis->CommitTransaction(); + + delete this; +} + +void Updater::OnResult(const Reply &r) +{ + Serialize::Type *st = Serialize::Type::Find(this->type); + + if (!st) + { + delete this; + return; + } + + Serializable *obj = st->objects[this->id]; + if (!obj) + { + delete this; + return; + } + + Data data; + obj->Serialize(data); + + /* Transaction start */ + me->redis->StartTransaction(); + + for (unsigned i = 0; i + 1 < r.multi_bulk.size(); i += 2) + { + const Reply &key = r.multi_bulk[i], + &value = r.multi_bulk[i + 1]; + + std::vector args; + args.push_back("SREM"); + args.push_back("value:" + this->type + ":" + key.bulk + ":" + value.bulk); + args.push_back(stringify(this->id)); + + /* Delete value -> object id */ + me->redis->SendCommand(NULL, args); + } + + /* Add object id to id set for this type */ + std::vector args; + args.push_back("SADD"); + args.push_back("ids:" + this->type); + args.push_back(stringify(obj->id)); + me->redis->SendCommand(NULL, args); + + args.clear(); + args.push_back("HMSET"); + args.push_back("hash:" + this->type + ":" + stringify(obj->id)); + + typedef std::map items; + for (items::iterator it = data.data.begin(), it_end = data.data.end(); it != it_end; ++it) + { + const Anope::string &key = it->first; + std::stringstream *value = it->second; + + args.push_back(key); + args.push_back(value->str()); + + std::vector args2; + + args2.push_back("SADD"); + args2.push_back("value:" + this->type + ":" + key + ":" + value->str()); + args2.push_back(stringify(obj->id)); + + /* Add to value -> object id set */ + me->redis->SendCommand(NULL, args2); + } + + ++obj->redis_ignore; + + /* Add object */ + me->redis->SendCommand(NULL, args); + + /* Transaction end */ + me->redis->CommitTransaction(); + + delete this; +} + +void SubscriptionListener::OnResult(const Reply &r) +{ + /* + * [May 15 13:59:35.645839 2013] Debug: pmessage + * [May 15 13:59:35.645866 2013] Debug: __keyspace@*__:anope:hash:* + * [May 15 13:59:35.645880 2013] Debug: __keyspace@0__:anope:hash:type:id + * [May 15 13:59:35.645893 2013] Debug: hset + */ + if (r.multi_bulk.size() != 4) + return; + + size_t sz = r.multi_bulk[2].bulk.find(':'); + if (sz == Anope::string::npos) + return; + + const Anope::string &key = r.multi_bulk[2].bulk.substr(sz + 1), + &op = r.multi_bulk[3].bulk; + + sz = key.rfind(':'); + if (sz == Anope::string::npos) + return; + + const Anope::string &id = key.substr(sz + 1); + + size_t sz2 = key.rfind(':', sz - 1); + if (sz2 == Anope::string::npos) + return; + const Anope::string &type = key.substr(sz2 + 1, sz - sz2 - 1); + + Serialize::Type *s_type = Serialize::Type::Find(type); + + if (s_type == NULL) + return; + + uint64_t obj_id; + try + { + obj_id = convertTo(id); + } + catch (const ConvertException &) + { + return; + } + + if (op == "hset" || op == "hdel") + { + Serializable *s = s_type->objects[obj_id]; + + if (s && s->redis_ignore) + { + --s->redis_ignore; + Log(LOG_DEBUG) << "redis: notify: got modify for object id " << obj_id << " of type " << type << ", but I am ignoring it"; + } + else + { + Log(LOG_DEBUG) << "redis: notify: got modify for object id " << obj_id << " of type " << type; + + std::vector args; + args.push_back("HGETALL"); + args.push_back("hash:" + type + ":" + id); + + me->redis->SendCommand(new ModifiedObject(me, type, obj_id), args); + } + } + else if (op == "del") + { + Serializable* &s = s_type->objects[obj_id]; + if (s == NULL) + return; + + Log(LOG_DEBUG) << "redis: notify: deleting object id " << obj_id << " of type " << type; + + Data data; + + s->Serialize(data); + + /* Transaction start */ + me->redis->StartTransaction(); + + typedef std::map items; + for (items::iterator it = data.data.begin(), it_end = data.data.end(); it != it_end; ++it) + { + const Anope::string &key = it->first; + std::stringstream *value = it->second; + + std::vector args; + args.push_back("SREM"); + args.push_back("value:" + type + ":" + key + ":" + value->str()); + args.push_back(id); + + /* Delete value -> object id */ + me->redis->SendCommand(NULL, args); + } + + std::vector args; + args.push_back("SREM"); + args.push_back("ids:" + type); + args.push_back(stringify(s->id)); + + /* Delete object from id set */ + me->redis->SendCommand(NULL, args); + + /* Transaction end */ + me->redis->CommitTransaction(); + + delete s; + s = NULL; + } +} + +void ModifiedObject::OnResult(const Reply &r) +{ + Serialize::Type *st = Serialize::Type::Find(this->type); + + if (!st) + { + delete this; + return; + } + + Serializable* &obj = st->objects[this->id]; + + /* Transaction start */ + me->redis->StartTransaction(); + + /* Erase old object values */ + if (obj) + { + Data data; + + obj->Serialize(data); + + typedef std::map items; + for (items::iterator it = data.data.begin(), it_end = data.data.end(); it != it_end; ++it) + { + const Anope::string &key = it->first; + std::stringstream *value = it->second; + + std::vector args; + args.push_back("SREM"); + args.push_back("value:" + st->GetName() + ":" + key + ":" + value->str()); + args.push_back(stringify(this->id)); + + /* Delete value -> object id */ + me->redis->SendCommand(NULL, args); + } + } + + Data data; + + for (unsigned i = 0; i + 1 < r.multi_bulk.size(); i += 2) + { + const Reply &key = r.multi_bulk[i], + &value = r.multi_bulk[i + 1]; + + data[key.bulk] << value.bulk; + } + + obj = st->Unserialize(obj, data); + if (obj) + { + obj->id = this->id; + obj->UpdateCache(data); + + /* Insert new object values */ + typedef std::map items; + for (items::iterator it = data.data.begin(), it_end = data.data.end(); it != it_end; ++it) + { + const Anope::string &key = it->first; + std::stringstream *value = it->second; + + std::vector args; + args.push_back("SADD"); + args.push_back("value:" + st->GetName() + ":" + key + ":" + value->str()); + args.push_back(stringify(obj->id)); + + /* Add to value -> object id set */ + me->redis->SendCommand(NULL, args); + } + + std::vector args; + args.push_back("SADD"); + args.push_back("ids:" + st->GetName()); + args.push_back(stringify(obj->id)); + + /* Add to type -> id set */ + me->redis->SendCommand(NULL, args); + } + + /* Transaction end */ + me->redis->CommitTransaction(); + + delete this; +} + +MODULE_INIT(DatabaseRedis) -- cgit