diff options
author | Adam <Adam@anope.org> | 2014-11-24 14:27:23 -0500 |
---|---|---|
committer | Adam <Adam@anope.org> | 2014-11-24 14:27:23 -0500 |
commit | 42238034490fb5479d787bd1695750387d508200 (patch) | |
tree | c93c62e0e1c936e656ae5b9ee1b62380ce2a194c /modules/database/db_redis.cpp | |
parent | d492923610d9c9146b2a2b63de38deab2cfd4ca7 (diff) |
Rewrite serializable to have field level granularity
Represent serializable objects in a digraph, and as a result made most
object relationships implicitly defined, and use the graph to trace
references between objects to determine relationships. Edges may
also be marked as having a dependency of the object they point to,
which allows for automatic cleanup and deletion of most objects when
no longer needed.
Additionally, this allows not having to require in-memory copies of
everything when using external databases. db_sql has been rewritten
for this and now always requires a database to function. db_sql with
MySQL now requires InnoDB to make use of transactions and foreign
key constraints.
Diffstat (limited to 'modules/database/db_redis.cpp')
-rw-r--r-- | modules/database/db_redis.cpp | 703 |
1 files changed, 241 insertions, 462 deletions
diff --git a/modules/database/db_redis.cpp b/modules/database/db_redis.cpp index dd5ff0301..6105bfa65 100644 --- a/modules/database/db_redis.cpp +++ b/modules/database/db_redis.cpp @@ -16,100 +16,35 @@ using namespace Redis; class DatabaseRedis; static DatabaseRedis *me; -class Data : public Serialize::Data -{ - public: - std::map<Anope::string, std::stringstream *> data; - - ~Data() - { - for (std::map<Anope::string, std::stringstream *>::iterator it = data.begin(), it_end = data.end(); it != it_end; ++it) - delete it->second; - } - - std::iostream& operator[](const Anope::string &key) override - { - std::stringstream* &stream = data[key]; - if (!stream) - stream = new std::stringstream(); - return *stream; - } - - std::set<Anope::string> KeySet() const override - { - std::set<Anope::string> keys; - for (std::map<Anope::string, std::stringstream *>::const_iterator it = this->data.begin(), it_end = this->data.end(); it != it_end; ++it) - keys.insert(it->first); - return keys; - } - - size_t Hash() const override - { - size_t hash = 0; - for (std::map<Anope::string, std::stringstream *>::const_iterator it = this->data.begin(), it_end = this->data.end(); it != it_end; ++it) - if (!it->second->str().empty()) - hash ^= Anope::hash_cs()(it->second->str()); - return hash; - } -}; - class TypeLoader : public Interface { - Anope::string type; + Serialize::TypeBase *type; + public: - TypeLoader(Module *creator, const Anope::string &t) : Interface(creator), type(t) { } + TypeLoader(Module *creator, Serialize::TypeBase *t) : Interface(creator), type(t) { } void OnResult(const Reply &r) override; }; class ObjectLoader : public Interface { - Anope::string type; - int64_t id; + Serialize::Object *obj; public: - ObjectLoader(Module *creator, const Anope::string &t, int64_t i) : Interface(creator), type(t), id(i) { } + ObjectLoader(Module *creator, Serialize::Object *s) : Interface(creator), obj(s) { } void OnResult(const Reply &r) override; }; -class IDInterface : public Interface +class FieldLoader : public Interface { - Reference<Serializable> o; - public: - IDInterface(Module *creator, Serializable *obj) : Interface(creator), o(obj) { } - - void OnResult(const Reply &r) override; -}; + Serialize::Object *obj; + Serialize::FieldBase *field; -class Deleter : public Interface -{ - Anope::string type; - int64_t id; public: - Deleter(Module *creator, const Anope::string &t, int64_t i) : Interface(creator), type(t), id(i) { } + FieldLoader(Module *creator, Serialize::Object *o, Serialize::FieldBase *f) : Interface(creator), obj(o), field(f) { } - void OnResult(const Reply &r) override; -}; - -class Updater : public Interface -{ - Anope::string type; - int64_t id; - public: - Updater(Module *creator, const Anope::string &t, int64_t i) : Interface(creator), type(t), id(i) { } - - void OnResult(const Reply &r) override; -}; - -class ModifiedObject : public Interface -{ - Anope::string type; - int64_t id; - public: - ModifiedObject(Module *creator, const Anope::string &t, int64_t i) : Interface(creator), type(t), id(i) { } - - void OnResult(const Reply &r) override; + void OnResult(const Reply &) override; }; class SubscriptionListener : public Interface @@ -120,341 +55,263 @@ class SubscriptionListener : public Interface void OnResult(const Reply &r) override; }; -class DatabaseRedis : public Module, public Pipe +class DatabaseRedis : public Module , public EventHook<Event::LoadDatabase> - , public EventHook<Event::SerializeTypeCreate> - , public EventHook<Event::SerializableConstruct> - , public EventHook<Event::SerializableDestruct> - , public EventHook<Event::SerializableUpdate> + , public EventHook<Event::SerializeEvents> { SubscriptionListener sl; - std::set<Serializable *> updated_items; public: ServiceReference<Provider> redis; DatabaseRedis(const Anope::string &modname, const Anope::string &creator) : Module(modname, creator, DATABASE | VENDOR) , EventHook<Event::LoadDatabase>("OnLoadDatabase") - , EventHook<Event::SerializeTypeCreate>("OnSerializeTypeCreate") - , EventHook<Event::SerializableConstruct>("OnSerializableConstruct") - , EventHook<Event::SerializableDestruct>("OnSerializableDestruct") - , EventHook<Event::SerializableUpdate>("OnSerializableUpdate") + , EventHook<Event::SerializeEvents>("OnSerialize") , sl(this) { me = this; - - } - - /* Insert or update an object */ - void InsertObject(Serializable *obj) - { - Serialize::Type *t = obj->GetSerializableType(); - - /* If there is no id yet for ths object, get one */ - if (!obj->id) - redis->SendCommand(new IDInterface(this, obj), "INCR id:" + t->GetName()); - else - { - Data data; - obj->Serialize(data); - - if (obj->IsCached(data)) - return; - - obj->UpdateCache(data); - - std::vector<Anope::string> args; - args.push_back("HGETALL"); - args.push_back("hash:" + t->GetName() + ":" + stringify(obj->id)); - - /* Get object attrs to clear before updating */ - redis->SendCommand(new Updater(this, t->GetName(), obj->id), args); - } - } - - void OnNotify() override - { - for (std::set<Serializable *>::iterator it = this->updated_items.begin(), it_end = this->updated_items.end(); it != it_end; ++it) - { - Serializable *s = *it; - - this->InsertObject(s); - } - - this->updated_items.clear(); } void OnReload(Configuration::Conf *conf) override { Configuration::Block *block = conf->GetModule(this); - this->redis = ServiceReference<Provider>("Redis::Provider", block->Get<const Anope::string>("engine", "redis/main")); + this->redis = ServiceReference<Provider>("Redis::Provider", block->Get<Anope::string>("engine", "redis/main")); } EventReturn OnLoadDatabase() override { - const std::vector<Anope::string> type_order = Serialize::Type::GetTypeOrder(); - for (unsigned i = 0; i < type_order.size(); ++i) - { - Serialize::Type *sb = Serialize::Type::Find(type_order[i]); - this->OnSerializeTypeCreate(sb); - } + if (!redis) + return EVENT_STOP; + + const std::map<Anope::string, Serialize::TypeBase *> &types = Serialize::TypeBase::GetTypes(); + for (const std::pair<Anope::string, Serialize::TypeBase *> &p : types) + this->OnSerializeTypeCreate(p.second); while (redis->BlockAndProcess()); - redis->Subscribe(&this->sl, "__keyspace@*__:hash:*"); + redis->Subscribe(&this->sl, "anope"); return EVENT_STOP; } - void OnSerializeTypeCreate(Serialize::Type *sb) override + void OnSerializeTypeCreate(Serialize::TypeBase *sb) { - if (!redis) - return; - - std::vector<Anope::string> args; - args.push_back("SMEMBERS"); - args.push_back("ids:" + sb->GetName()); + std::vector<Anope::string> args = { "SMEMBERS", "ids:" + sb->GetName() }; - redis->SendCommand(new TypeLoader(this, sb->GetName()), args); + redis->SendCommand(new TypeLoader(this, sb), args); } - void OnSerializableConstruct(Serializable *obj) override + EventReturn OnSerializeList(Serialize::TypeBase *type, std::vector<Serialize::ID> &ids) override { - this->updated_items.insert(obj); - this->Notify(); + return EVENT_CONTINUE; } - void OnSerializableDestruct(Serializable *obj) override + EventReturn OnSerializeFind(Serialize::TypeBase *type, Serialize::FieldBase *field, const Anope::string &value, Serialize::ID &id) override { - Serialize::Type *t = obj->GetSerializableType(); - - std::vector<Anope::string> args; - args.push_back("HGETALL"); - args.push_back("hash:" + t->GetName() + ":" + stringify(obj->id)); + return EVENT_CONTINUE; + } - /* Get all of the attributes for this object */ - redis->SendCommand(new Deleter(this, t->GetName(), obj->id), args); + EventReturn OnSerializeGet(Serialize::Object *object, Serialize::FieldBase *field, Anope::string &value) override + { + return EVENT_CONTINUE; + } - this->updated_items.erase(obj); - t->objects.erase(obj->id); - this->Notify(); + EventReturn OnSerializeGetRefs(Serialize::Object *object, Serialize::TypeBase *type, std::vector<Serialize::Edge> &) override + { + return EVENT_CONTINUE; } - void OnSerializableUpdate(Serializable *obj) override + EventReturn OnSerializeDeref(Serialize::ID id, Serialize::TypeBase *type) override { - this->updated_items.insert(obj); - this->Notify(); + return EVENT_CONTINUE; } -}; -void TypeLoader::OnResult(const Reply &r) -{ - if (r.type != Reply::MULTI_BULK || !me->redis) + EventReturn OnSerializeGetSerializable(Serialize::Object *object, Serialize::FieldBase *field, Anope::string &type, Serialize::ID &value) override { - delete this; - return; + return EVENT_CONTINUE; } - for (unsigned i = 0; i < r.multi_bulk.size(); ++i) + EventReturn OnSerializeSet(Serialize::Object *object, Serialize::FieldBase *field, const Anope::string &value) override { - const Reply *reply = r.multi_bulk[i]; + std::vector<Anope::string> args; - if (reply->type != Reply::BULK) - continue; + redis->StartTransaction(); - int64_t id; - try - { - id = convertTo<int64_t>(reply->bulk); - } - catch (const ConvertException &) - { - continue; - } + const Anope::string &old = field->SerializeToString(object); + args = { "SREM", "lookup:" + object->GetSerializableType()->GetName() + ":" + field->GetName() + ":" + old, stringify(object->id) }; + redis->SendCommand(nullptr, args); - std::vector<Anope::string> args; - args.push_back("HGETALL"); - args.push_back("hash:" + this->type + ":" + stringify(id)); + // add object to type set + args = { "SADD", "ids:" + object->GetSerializableType()->GetName(), stringify(object->id) }; + redis->SendCommand(nullptr, args); - me->redis->SendCommand(new ObjectLoader(me, this->type, id), args); - } + // add key to key set + args = { "SADD", "keys:" + stringify(object->id), field->GetName() }; + redis->SendCommand(nullptr, args); - delete this; -} + // set value + args = { "SET", "values:" + stringify(object->id) + ":" + field->GetName(), value }; + redis->SendCommand(nullptr, args); -void ObjectLoader::OnResult(const Reply &r) -{ - Serialize::Type *st = Serialize::Type::Find(this->type); + // lookup + args = { "SADD", "lookup:" + object->GetSerializableType()->GetName() + ":" + field->GetName() + ":" + value, stringify(object->id) }; + redis->SendCommand(nullptr, args); - if (r.type != Reply::MULTI_BULK || r.multi_bulk.empty() || !me->redis || !st) - { - delete this; - return; + redis->CommitTransaction(); + + return EVENT_CONTINUE; } - Data data; + EventReturn OnSerializeSetSerializable(Serialize::Object *object, Serialize::FieldBase *field, Serialize::Object *value) override + { + return OnSerializeSet(object, field, stringify(value->id)); + } - for (unsigned i = 0; i + 1 < r.multi_bulk.size(); i += 2) + EventReturn OnSerializeUnset(Serialize::Object *object, Serialize::FieldBase *field) override { - const Reply *key = r.multi_bulk[i], - *value = r.multi_bulk[i + 1]; + std::vector<Anope::string> args; - data[key->bulk] << value->bulk; + redis->StartTransaction(); + + const Anope::string &old = field->SerializeToString(object); + args = { "SREM", "lookup:" + object->GetSerializableType()->GetName() + ":" + field->GetName() + ":" + old, stringify(object->id) }; + redis->SendCommand(nullptr, args); + + // remove field from set + args = { "SREM", "keys:" + stringify(object->id), field->GetName() }; + redis->SendCommand(nullptr, args); + + redis->CommitTransaction(); + + return EVENT_CONTINUE; } - Serializable* &obj = st->objects[this->id]; - obj = st->Unserialize(obj, data); - if (obj) + EventReturn OnSerializeUnsetSerializable(Serialize::Object *object, Serialize::FieldBase *field) override { - obj->id = this->id; - obj->UpdateCache(data); + return OnSerializeUnset(object, field); } - delete this; -} - -void IDInterface::OnResult(const Reply &r) -{ - if (!o || r.type != Reply::INT || !r.i) + EventReturn OnSerializeHasField(Serialize::Object *object, Serialize::FieldBase *field) override { - delete this; - return; + return EVENT_CONTINUE; } - Serializable* &obj = o->GetSerializableType()->objects[r.i]; - if (obj) - /* This shouldn't be possible */ - obj->id = 0; - - o->id = r.i; - obj = o; + EventReturn OnSerializableGetId(Serialize::ID &id) override + { + std::vector<Anope::string> args = { "INCR", "id" }; - /* Now that we have the id, insert this object for real */ - anope_dynamic_static_cast<DatabaseRedis *>(this->owner)->InsertObject(o); + auto f = [&](const Reply &r) + { + id = r.i; + }; - delete this; -} + FInterface inter(this, f); + redis->SendCommand(&inter, args); + while (redis->BlockAndProcess()); + return EVENT_ALLOW; + } -void Deleter::OnResult(const Reply &r) -{ - if (r.type != Reply::MULTI_BULK || !me->redis || r.multi_bulk.empty()) + void OnSerializableCreate(Serialize::Object *) override { - delete this; - return; } - /* Transaction start */ - me->redis->StartTransaction(); + void OnSerializableDelete(Serialize::Object *obj) override + { + std::vector<Anope::string> args; - std::vector<Anope::string> args; - args.push_back("DEL"); - args.push_back("hash:" + this->type + ":" + stringify(this->id)); + redis->StartTransaction(); - /* Delete hash object */ - me->redis->SendCommand(NULL, args); + for (Serialize::FieldBase *field : obj->GetSerializableType()->fields) + { + Anope::string value = field->SerializeToString(obj); - args.clear(); - args.push_back("SREM"); - args.push_back("ids:" + this->type); - args.push_back(stringify(this->id)); + args = { "SREM", "lookup:" + obj->GetSerializableType()->GetName() + ":" + field->GetName() + ":" + value, stringify(obj->id) }; + redis->SendCommand(nullptr, args); - /* Delete id from ids set */ - me->redis->SendCommand(NULL, args); + args = { "DEL", "values:" + stringify(obj->id) + ":" + field->GetName() }; + redis->SendCommand(nullptr, args); - for (unsigned i = 0; i + 1 < r.multi_bulk.size(); i += 2) - { - const Reply *key = r.multi_bulk[i], - *value = r.multi_bulk[i + 1]; + args = { "SREM", "keys:" + stringify(obj->id), field->GetName() }; + redis->SendCommand(nullptr, args); + } - args.clear(); - args.push_back("SREM"); - args.push_back("value:" + this->type + ":" + key->bulk + ":" + value->bulk); - args.push_back(stringify(this->id)); + args = { "SREM", "ids:" + obj->GetSerializableType()->GetName(), stringify(obj->id) }; + redis->SendCommand(nullptr, args); - /* Delete value -> object id */ - me->redis->SendCommand(NULL, args); + redis->CommitTransaction(); } +}; - /* Transaction end */ - me->redis->CommitTransaction(); - - delete this; -} - -void Updater::OnResult(const Reply &r) +void TypeLoader::OnResult(const Reply &r) { - Serialize::Type *st = Serialize::Type::Find(this->type); - - if (!st) + if (r.type != Reply::MULTI_BULK || !me->redis) { delete this; return; } - Serializable *obj = st->objects[this->id]; - if (!obj) + for (unsigned i = 0; i < r.multi_bulk.size(); ++i) { - delete this; - return; - } + const Reply *reply = r.multi_bulk[i]; - Data data; - obj->Serialize(data); + if (reply->type != Reply::BULK) + continue; - /* Transaction start */ - me->redis->StartTransaction(); + int64_t id; + try + { + id = convertTo<int64_t>(reply->bulk); + } + catch (const ConvertException &) + { + continue; + } - for (unsigned i = 0; i + 1 < r.multi_bulk.size(); i += 2) - { - const Reply *key = r.multi_bulk[i], - *value = r.multi_bulk[i + 1]; + Serialize::Object *obj = type->Require(id); + if (obj == nullptr) + { + Log(LOG_DEBUG) << "redis: Unable to require object #" << id << " of type " << type->GetName(); + continue; + } - std::vector<Anope::string> args; - args.push_back("SREM"); - args.push_back("value:" + this->type + ":" + key->bulk + ":" + value->bulk); - args.push_back(stringify(this->id)); + std::vector<Anope::string> args = { "SMEMBERS", "keys:" + stringify(id) }; - /* Delete value -> object id */ - me->redis->SendCommand(NULL, args); + me->redis->SendCommand(new ObjectLoader(me, obj), args); } - /* Add object id to id set for this type */ - std::vector<Anope::string> args; - args.push_back("SADD"); - args.push_back("ids:" + this->type); - args.push_back(stringify(obj->id)); - me->redis->SendCommand(NULL, args); - - args.clear(); - args.push_back("HMSET"); - args.push_back("hash:" + this->type + ":" + stringify(obj->id)); + delete this; +} - typedef std::map<Anope::string, std::stringstream *> items; - for (items::iterator it = data.data.begin(), it_end = data.data.end(); it != it_end; ++it) +void ObjectLoader::OnResult(const Reply &r) +{ + if (r.type != Reply::MULTI_BULK || r.multi_bulk.empty() || !me->redis) { - const Anope::string &key = it->first; - std::stringstream *value = it->second; + delete this; + return; + } - args.push_back(key); - args.push_back(value->str()); + Serialize::TypeBase *type = obj->GetSerializableType(); - std::vector<Anope::string> args2; + for (Reply *reply : r.multi_bulk) + { + const Anope::string &key = reply->bulk; + Serialize::FieldBase *field = type->GetField(key); - args2.push_back("SADD"); - args2.push_back("value:" + this->type + ":" + key + ":" + value->str()); - args2.push_back(stringify(obj->id)); + if (field == nullptr) + continue; - /* Add to value -> object id set */ - me->redis->SendCommand(NULL, args2); - } + std::vector<Anope::string> args = { "GET", "values:" + stringify(obj->id) + ":" + key }; - ++obj->redis_ignore; + me->redis->SendCommand(new FieldLoader(me, obj, field), args); + } - /* Add object */ - me->redis->SendCommand(NULL, args); + delete this; +} - /* Transaction end */ - me->redis->CommitTransaction(); +void FieldLoader::OnResult(const Reply &r) +{ + Log(LOG_DEBUG_2) << "redis: Setting field " << field->GetName() << " of object #" << obj->id << " of type " << obj->GetSerializableType()->GetName() << " to " << r.bulk; + field->UnserializeFromString(obj, r.bulk); delete this; } @@ -462,196 +319,118 @@ void Updater::OnResult(const Reply &r) void SubscriptionListener::OnResult(const Reply &r) { /* - * [May 15 13:59:35.645839 2013] Debug: pmessage - * [May 15 13:59:35.645866 2013] Debug: __keyspace@*__:anope:hash:* - * [May 15 13:59:35.645880 2013] Debug: __keyspace@0__:anope:hash:type:id - * [May 15 13:59:35.645893 2013] Debug: hset + * message + * anope + * message + * + * set 4 email adam@anope.org + * unset 4 email + * create 4 NickCore + * delete 4 */ - if (r.multi_bulk.size() != 4) - return; - - size_t sz = r.multi_bulk[2]->bulk.find(':'); - if (sz == Anope::string::npos) - return; - const Anope::string &key = r.multi_bulk[2]->bulk.substr(sz + 1), - &op = r.multi_bulk[3]->bulk; + const Anope::string &message = r.multi_bulk[2]->bulk; + Anope::string command; + spacesepstream sep(message); - sz = key.rfind(':'); - if (sz == Anope::string::npos) - return; - - const Anope::string &id = key.substr(sz + 1); - - size_t sz2 = key.rfind(':', sz - 1); - if (sz2 == Anope::string::npos) - return; - const Anope::string &type = key.substr(sz2 + 1, sz - sz2 - 1); + sep.GetToken(command); - Serialize::Type *s_type = Serialize::Type::Find(type); - - if (s_type == NULL) - return; - - uint64_t obj_id; - try + if (command == "set" || command == "unset") { - obj_id = convertTo<uint64_t>(id); - } - catch (const ConvertException &) - { - return; - } + Anope::string sid, key, value; - if (op == "hset" || op == "hdel") - { - Serializable *s = s_type->objects[obj_id]; + sep.GetToken(sid); + sep.GetToken(key); + value = sep.GetRemaining(); - if (s && s->redis_ignore) + Serialize::ID id; + try { - --s->redis_ignore; - Log(LOG_DEBUG) << "redis: notify: got modify for object id " << obj_id << " of type " << type << ", but I am ignoring it"; + id = convertTo<Serialize::ID>(sid); } - else + catch (const ConvertException &ex) { - Log(LOG_DEBUG) << "redis: notify: got modify for object id " << obj_id << " of type " << type; - - std::vector<Anope::string> args; - args.push_back("HGETALL"); - args.push_back("hash:" + type + ":" + id); - - me->redis->SendCommand(new ModifiedObject(me, type, obj_id), args); - } - } - else if (op == "del") - { - Serializable* &s = s_type->objects[obj_id]; - if (s == NULL) + Log(LOG_DEBUG) << "redis: unable to get id for SL update key " << sid; return; + } - Log(LOG_DEBUG) << "redis: notify: deleting object id " << obj_id << " of type " << type; - - Data data; - - s->Serialize(data); - - /* Transaction start */ - me->redis->StartTransaction(); - - typedef std::map<Anope::string, std::stringstream *> items; - for (items::iterator it = data.data.begin(), it_end = data.data.end(); it != it_end; ++it) + Serialize::Object *obj = Serialize::GetID(id); + if (obj == nullptr) { - const Anope::string &k = it->first; - std::stringstream *value = it->second; - - std::vector<Anope::string> args; - args.push_back("SREM"); - args.push_back("value:" + type + ":" + k + ":" + value->str()); - args.push_back(id); - - /* Delete value -> object id */ - me->redis->SendCommand(NULL, args); + Log(LOG_DEBUG) << "redis: pmessage for unknown object #" << id; + return; } - std::vector<Anope::string> args; - args.push_back("SREM"); - args.push_back("ids:" + type); - args.push_back(stringify(s->id)); - - /* Delete object from id set */ - me->redis->SendCommand(NULL, args); - - /* Transaction end */ - me->redis->CommitTransaction(); - - delete s; - s = NULL; - } -} - -void ModifiedObject::OnResult(const Reply &r) -{ - Serialize::Type *st = Serialize::Type::Find(this->type); + Serialize::FieldBase *field = obj->GetSerializableType()->GetField(key); + if (field == nullptr) + { + Log(LOG_DEBUG) << "redis: pmessage for unknown field of object #" << id << ": " << key; + return; + } - if (!st) - { - delete this; - return; + Log(LOG_DEBUG_2) << "redis: Setting field " << field->GetName() << " of object #" << obj->id << " of type " << obj->GetSerializableType()->GetName() << " to " << value; + field->UnserializeFromString(obj, value); } - - Serializable* &obj = st->objects[this->id]; - - /* Transaction start */ - me->redis->StartTransaction(); - - /* Erase old object values */ - if (obj) + else if (command == "create") { - Data data; + Anope::string sid, stype; - obj->Serialize(data); + sep.GetToken(sid); + sep.GetToken(stype); - typedef std::map<Anope::string, std::stringstream *> items; - for (items::iterator it = data.data.begin(), it_end = data.data.end(); it != it_end; ++it) + Serialize::ID id; + try + { + id = convertTo<Serialize::ID>(sid); + } + catch (const ConvertException &ex) { - const Anope::string &key = it->first; - std::stringstream *value = it->second; + Log(LOG_DEBUG) << "redis: unable to get id for SL update key " << sid; + return; + } - std::vector<Anope::string> args; - args.push_back("SREM"); - args.push_back("value:" + st->GetName() + ":" + key + ":" + value->str()); - args.push_back(stringify(this->id)); + Serialize::TypeBase *type = Serialize::TypeBase::Find(stype); + if (type == nullptr) + { + Log(LOG_DEBUG) << "redis: pmessage create for nonexistant type " << stype; + return; + } - /* Delete value -> object id */ - me->redis->SendCommand(NULL, args); + Serialize::Object *obj = type->Require(id); + if (obj == nullptr) + { + Log(LOG_DEBUG) << "redis: require for pmessage create type " << type->GetName() << " id #" << id << " returned nullptr"; + return; } } - - Data data; - - for (unsigned i = 0; i + 1 < r.multi_bulk.size(); i += 2) + else if (command == "delete") { - const Reply *key = r.multi_bulk[i], - *value = r.multi_bulk[i + 1]; - - data[key->bulk] << value->bulk; - } + Anope::string sid; - obj = st->Unserialize(obj, data); - if (obj) - { - obj->id = this->id; - obj->UpdateCache(data); + sep.GetToken(sid); - /* Insert new object values */ - typedef std::map<Anope::string, std::stringstream *> items; - for (items::iterator it = data.data.begin(), it_end = data.data.end(); it != it_end; ++it) + Serialize::ID id; + try { - const Anope::string &key = it->first; - std::stringstream *value = it->second; - - std::vector<Anope::string> args; - args.push_back("SADD"); - args.push_back("value:" + st->GetName() + ":" + key + ":" + value->str()); - args.push_back(stringify(obj->id)); - - /* Add to value -> object id set */ - me->redis->SendCommand(NULL, args); + id = convertTo<Serialize::ID>(sid); + } + catch (const ConvertException &ex) + { + Log(LOG_DEBUG) << "redis: unable to get id for SL update key " << sid; + return; } - std::vector<Anope::string> args; - args.push_back("SADD"); - args.push_back("ids:" + st->GetName()); - args.push_back(stringify(obj->id)); + Serialize::Object *obj = Serialize::GetID(id); + if (obj == nullptr) + { + Log(LOG_DEBUG) << "redis: message for unknown object #" << id; + return; + } - /* Add to type -> id set */ - me->redis->SendCommand(NULL, args); + obj->Delete(); } - - /* Transaction end */ - me->redis->CommitTransaction(); - - delete this; + else + Log(LOG_DEBUG) << "redis: unknown message: " << message; } MODULE_INIT(DatabaseRedis) |