diff options
-rw-r--r-- | data/example.conf | 30 | ||||
-rw-r--r-- | data/modules.example.conf | 25 | ||||
-rw-r--r-- | docs/REDIS | 160 | ||||
-rw-r--r-- | include/modules.h | 2 | ||||
-rw-r--r-- | include/modules/redis.h | 62 | ||||
-rw-r--r-- | include/serialize.h | 7 | ||||
-rw-r--r-- | include/sockets.h | 4 | ||||
-rw-r--r-- | modules/database/db_redis.cpp | 648 | ||||
-rw-r--r-- | modules/database/db_sql_live.cpp | 4 | ||||
-rw-r--r-- | modules/m_redis.cpp | 620 | ||||
-rw-r--r-- | src/serialize.cpp | 4 | ||||
-rw-r--r-- | src/socket_clients.cpp | 4 |
12 files changed, 1553 insertions, 17 deletions
diff --git a/data/example.conf b/data/example.conf index f1433b7d1..aea4c07f3 100644 --- a/data/example.conf +++ b/data/example.conf @@ -22,7 +22,7 @@ * } * * Note that nameless blocks are allowed and are often used with comments to allow - * easialy commenting an entire block, for example: + * easily commenting an entire block, for example: * #foobar * { * moo = "cow" @@ -1082,7 +1082,7 @@ mail #module { name = "db_plain" } /* - * db_flatfile + * [RECOMMENDED] db_flatfile * * This is the default flatfile database format. */ @@ -1124,6 +1124,11 @@ module * This is only useful with very large databases, with hundreds * of thousands of objects, that have a noticeable delay from * writing databases. + * + * If your database is large enough cause a noticible delay when + * saving you should consider a more powerful alternative such + * as db_sql or db_redis, which incrementally update their + * databases asynchronously in real time. */ fork = no } @@ -1173,6 +1178,27 @@ module } /* + * db_redis. + * + * This module allows using Redis (http://redis.io) as a database backend. + * This module requires that m_redis is loaded and configured properly. + * + * Redis 2.8 supports keyspace notifications which allows Redis to push notifications + * to Anope about outside modifications to the database. This module supports this and + * will internally reflect any changes made to the database immediately once notified. + * See docs/REDIS for more information regarding this. + */ +#module +{ + name = "db_redis" + + /* + * Redis database to use. This must be configured with m_redis. + */ + engine = "redis/main" +} + +/* * [RECOMMENDED] Encryption modules. * * The encryption modules are used when dealing with passwords. This determines how diff --git a/data/modules.example.conf b/data/modules.example.conf index 01730ce29..2129d3ac9 100644 --- a/data/modules.example.conf +++ b/data/modules.example.conf @@ -504,6 +504,31 @@ module { name = "help" } } /* + * m_redis + * + * This module allows other modules to use Redis. + */ +#module +{ + name = "m_redis" + + /* A redis database */ + redis + { + /* The name of this service */ + name = "redis/main" + + /* + * The redis database to use. New connections default to 0. + */ + db = 0 + + ip = "127.0.0.1" + port = 6379 + } +} + +/* * m_regex_pcre * * Provides the regex engine regex/pcre, which uses the Perl Compatible Regular Expressions library. diff --git a/docs/REDIS b/docs/REDIS new file mode 100644 index 000000000..8c40d5690 --- /dev/null +++ b/docs/REDIS @@ -0,0 +1,160 @@ +Starting in Anope 1.9.9, Anope has Redis database support (http://redis.io/). +This document explains the data structure used by Anope, and explains how +keyspace notification works. + +This is not a tutorial on how to use Redis, see http://redis.io/documentation +for that. + +Table of Contents +----------------- +1) Data structure +2) Keyspace notifications +3) Examples of modfying, deleting, and creating objects + +1) Data structure + + There are 4 key namespaces in Anope, they are: + + id - The keys in id are used to atomically create object ids for new + objects. For example, if I were to create a new BotInfo I would first: + + redis 127.0.0.1:6379> INCR id:BotInfo + + To get the object ID of the new object. + + ids - The keys in ids contain a set of all object ids of the given type. + For example: + + redis 127.0.0.1:6379> SMEMBERS ids:BotInfo + + Returns "1", "2", "3", "4", "5", "6", "7", "8" because I have 8 bots that + have IDs 1, 2, 3, 4, 5, 6, 7, and 8, respectively. + + hash - The keys in hash are the actual objects, stored as hashes. For + example, if I had just looked up all BotInfo ids and wanted to iterate + over all of them, I woulld start by: + + redis 127.0.0.1:6379> HGETALL hash:BotInfo:1 + + Which gets all keys and values from the hash of type BotInfo with id 1. + This may return: + + "nick" -> "BotServ" + "user" -> "services" + "host" -> "services.anope.org" + "created" -> "1368704765" + + value - The keys in value only exist to aid looking up object IDs. They + are sets of object IDs and are used to map key+value pairs to objects. + For example: + + redis 127.0.0.1:6379> SMEMBERS value:NickAlias:nick:Adam + + Returns a set of object ids of NickAlias objects that have the key + 'nick' set to the value 'Adam' in its hash. Clearly this can only + ever contain at most one object, since it is not possible to have + more than one registered nick with the same name, but other keys + will contain more than one, such as: + + redis 127.0.0.1:6379> SMEMBERS value:NickCore:email:adam@anope.org + + Which would return all accounts with the email "adam@anope.org". + + redis 127.0.0.1:6379> SMEMBERS value:ChanAccess:mask:Adam + + Which would return all access entries set on the account "Adam". + + Behavior similar to SQL's AND, can be achieved using the + SINTER command, which does set intersection on one or more sets. + +2) Keyspace notifications + + Redis 2.7 (unstable) and 2.8 (stable) and newer support keyspace notifications + (http://redis.io/topics/notifications). This allows Redis to notify Anope of + any external changes to objects in the database. Once notified, Anope will + immediately update the object. Otherwise, Anope keeps all objects in memory + and will not regularly read from the databaes once started. + + You can use this to modify objects in Redis and have them immediately reflected + back into Anope. Additionally you can use this feature to run multiple Anope + instances simultaneously from the same database (see also, Redis database + replication). + + To use keyspace notifications you MUST execute + + redis 127.0.0.1:6379> CONFIG SET notify-keyspace-events KA + OK + + or set notify-keyspace-events in redis.conf properly. Anope always executes + CONFIG SET when it first connects. + + If you do not enable keyspace events properly Anope will be UNABLE to see any + object modifications you do. + + The key space ids and value are managed entirely by Anope, you do + not (and should not) modify them. Once you modify the object (hash), Anope will + update them for you to correctly refelect any changes made to the object. + + Finally, always use atomic operations. If you are inserting a new object with + multiple commands, or inserting multiple objects at once, specifically if the + objects depend on each other, you MUST use a transaction. + +3) Examples of modfying, deleting, and creating objects + + These examples will ONLY work if you meet the criteria in section 2. + + If I want to change the email account 'Adam' to 'Adam@anope.org', I would execute the following: + + redis 127.0.0.1:6379> SMEMBERS value:NickCore:display:Adam + + Which returns a value of "1", which is the object id I want to modify. + Now to change the email: + + redis 127.0.0.1:6379> HSET hash:NickCore:1 email Adam@anope.org + + You can now see this in NickServ's INFO command: + -NickServ- Email address: Adam@anope.org + + If I want to drop the account "Adam", I would execute the following: + + redis 127.0.0.1:6379> SMEMBERS value:NickCore:display:Adam + + Which returns a value of "1". I would then check: + + redis 127.0.0.1:6379> SMEMBERS value:NickAlias:nc:Adam + + To see what nicknames depend on this account to exist, as I will + have to remove those too. This returns the values "2", and "3". + + Finally, I can drop the nick using a transaction via: + + redis 127.0.0.1:6379> MULTI + OK + redis 127.0.0.1:6379> DEL hash:NickAlias:2 + QUEUED + redis 127.0.0.1:6379> DEL hash:NickAlias:3 + QUEUED + redis 127.0.0.1:6379> DEL hash:NickCore:1 + QUEUED + redis 127.0.0.1:6379> EXEC + + Or alternatively simply: + + redis 127.0.0.1:6379> DEL hash:NickAlias:2 hash:NickAlias:3 hash:NickCore:1 + + If I wanted to create a BotServ bot, I would execute the following: + + redis 127.0.0.1:6379> INCR id:BotInfo + + Which returns a new object ID for me, in this example it will be "8". + Now I can create the object: + + HMSET hash:BotInfo:8 nick redis user redis host services.anope.org realname "Services for IRC Networks" + + Note if you are using HSET instead of HMSET you will need to use a transaction, as shown in the above example. + If you are watching your services logs you will immediatly see: + + USERS: redis!redis@services.anope.org (Services for IRC Networks) connected to the network (services.anope.org) + + And the bot redis will be in BotServ's bot list. + Notice how ids:BotInfo and the value keys are updated automatically. diff --git a/include/modules.h b/include/modules.h index 5585e9e77..31b09c2cf 100644 --- a/include/modules.h +++ b/include/modules.h @@ -368,7 +368,7 @@ class CoreExport Module : public Extensible virtual EventReturn OnSaveDatabase() { return EVENT_CONTINUE; } /** Called when the databases are loaded - * @return EVENT_CONTINUE to let other modules continue saving, EVENT_STOP to stop + * @return EVENT_CONTINUE to let other modules continue loading, EVENT_STOP to stop */ virtual EventReturn OnLoadDatabase() { return EVENT_CONTINUE; } diff --git a/include/modules/redis.h b/include/modules/redis.h new file mode 100644 index 000000000..4b321a608 --- /dev/null +++ b/include/modules/redis.h @@ -0,0 +1,62 @@ +/* + * + * (C) 2003-2013 Anope Team + * Contact us at team@anope.org + * + * Please read COPYING and README for further details. + * + */ + +namespace Redis +{ + struct Reply + { + enum Type + { + NOT_PARSED, + ERROR, + OK, + INT, + BULK, + MULTI_BULK + } + type; + + Reply() { Clear(); } + void Clear() { type = NOT_PARSED; i = 0; bulk.clear(); multi_bulk_size = 0; multi_bulk.clear(); } + + int64_t i; + Anope::string bulk; + int multi_bulk_size; + std::deque<Reply> multi_bulk; + }; + + class Interface + { + public: + Module *owner; + + Interface(Module *m) : owner(m) { } + + virtual void OnResult(const Reply &r) = 0; + virtual void OnError(const Anope::string &error) { Log(owner) << error; } + }; + + class Provider : public Service + { + public: + Provider(Module *c, const Anope::string &n) : Service(c, "Redis::Provider", n) { } + + virtual void SendCommand(Interface *i, const std::vector<Anope::string> &cmds) = 0; + virtual void SendCommand(Interface *i, const Anope::string &str) = 0; + + virtual bool BlockAndProcess() = 0; + + virtual void Subscribe(Interface *i, const Anope::string &pattern) = 0; + virtual void Unsubscribe(const Anope::string &pattern) = 0; + + virtual void StartTransaction() = 0; + virtual void CommitTransaction() = 0; + }; +} + diff --git a/include/serialize.h b/include/serialize.h index a43046b1e..472701932 100644 --- a/include/serialize.h +++ b/include/serialize.h @@ -80,7 +80,10 @@ class CoreExport Serializable : public virtual Base virtual ~Serializable(); /* Unique ID (per type, not globally) for this object */ - unsigned int id; + uint64_t id; + + /* Only used by redis, to ignore updates */ + unsigned short redis_ignore; /** Marks the object as potentially being updated "soon". */ @@ -129,7 +132,7 @@ class CoreExport Serialize::Type public: /* Map of Serializable::id to Serializable objects */ - std::map<unsigned int, Serializable *> objects; + std::map<uint64_t, Serializable *> objects; /** Creates a new serializable type * @param n Type name diff --git a/include/sockets.h b/include/sockets.h index 8b1abe5c2..8b7b78a7f 100644 --- a/include/sockets.h +++ b/include/sockets.h @@ -388,10 +388,6 @@ class CoreExport ConnectionSocket : public virtual Socket /* Sockaddrs for connection ip/port */ sockaddrs conaddr; - /** Constructor - */ - ConnectionSocket(); - /** Connect the socket * @param TargetHost The target host to connect to * @param Port The target port to connect to diff --git a/modules/database/db_redis.cpp b/modules/database/db_redis.cpp new file mode 100644 index 000000000..03c7dabca --- /dev/null +++ b/modules/database/db_redis.cpp @@ -0,0 +1,648 @@ +/* + * + * (C) 2003-2013 Anope Team + * Contact us at team@anope.org + * + * Please read COPYING and README for further details. + * + * + */ + +#include "module.h" +#include "modules/redis.h" + +using namespace Redis; + +class DatabaseRedis; +static DatabaseRedis *me; + +class Data : public Serialize::Data +{ + public: + std::map<Anope::string, std::stringstream *> data; + + ~Data() + { + for (std::map<Anope::string, std::stringstream *>::iterator it = data.begin(), it_end = data.end(); it != it_end; ++it) + delete it->second; + } + + std::iostream& operator[](const Anope::string &key) anope_override + { + std::stringstream* &stream = data[key]; + if (!stream) + stream = new std::stringstream(); + return *stream; + } + + std::set<Anope::string> KeySet() const anope_override + { + std::set<Anope::string> keys; + for (std::map<Anope::string, std::stringstream *>::const_iterator it = this->data.begin(), it_end = this->data.end(); it != it_end; ++it) + keys.insert(it->first); + return keys; + } + + size_t Hash() const anope_override + { + size_t hash = 0; + for (std::map<Anope::string, std::stringstream *>::const_iterator it = this->data.begin(), it_end = this->data.end(); it != it_end; ++it) + if (!it->second->str().empty()) + hash ^= Anope::hash_cs()(it->second->str()); + return hash; + } +}; + +class TypeLoader : public Interface +{ + Anope::string type; + public: + TypeLoader(Module *creator, const Anope::string &t) : Interface(creator), type(t) { } + + void OnResult(const Reply &r) anope_override; +}; + +class ObjectLoader : public Interface +{ + Anope::string type; + int64_t id; + + public: + ObjectLoader(Module *creator, const Anope::string &t, int64_t i) : Interface(creator), type(t), id(i) { } + + void OnResult(const Reply &r) anope_override; +}; + +class IDInterface : public Interface +{ + Reference<Serializable> o; + public: + IDInterface(Module *creator, Serializable *obj) : Interface(creator), o(obj) { } + + void OnResult(const Reply &r) anope_override; +}; + +class Deleter : public Interface +{ + Anope::string type; + int64_t id; + public: + Deleter(Module *creator, const Anope::string &t, int64_t i) : Interface(creator), type(t), id(i) { } + + void OnResult(const Reply &r) anope_override; +}; + +class Updater : public Interface +{ + Anope::string type; + int64_t id; + public: + Updater(Module *creator, const Anope::string &t, int64_t i) : Interface(creator), type(t), id(i) { } + + void OnResult(const Reply &r) anope_override; +}; + +class ModifiedObject : public Interface +{ + Anope::string type; + int64_t id; + public: + ModifiedObject(Module *creator, const Anope::string &t, int64_t i) : Interface(creator), type(t), id(i) { } + + void OnResult(const Reply &r) anope_override; +}; + +class SubscriptionListener : public Interface +{ + public: + SubscriptionListener(Module *creator) : Interface(creator) { } + + void OnResult(const Reply &r) anope_override; +}; + +class DatabaseRedis : public Module, public Pipe +{ + SubscriptionListener sl; + std::set<Serializable *> updated_items; + + public: + ServiceReference<Provider> redis; + + DatabaseRedis(const Anope::string &modname, const Anope::string &creator) : Module(modname, creator, DATABASE | VENDOR), sl(this) + { + me = this; + + Implementation i[] = { I_OnReload, I_OnLoadDatabase, I_OnSerializeTypeCreate, I_OnSerializableConstruct, I_OnSerializableDestruct, I_OnSerializableUpdate }; + ModuleManager::Attach(i, this, sizeof(i) / sizeof(Implementation)); + } + + /* Insert or update an object */ + void InsertObject(Serializable *obj) + { + Serialize::Type *type = obj->GetSerializableType(); + + /* If there is no id yet for ths object, get one */ + if (!obj->id) + redis->SendCommand(new IDInterface(this, obj), "INCR id:" + type->GetName()); + else + { + Data data; + obj->Serialize(data); + + if (obj->IsCached(data)) + return; + + obj->UpdateCache(data); + + std::vector<Anope::string> args; + args.push_back("HGETALL"); + args.push_back("hash:" + type->GetName() + ":" + stringify(obj->id)); + + /* Get object attrs to clear before updating */ + redis->SendCommand(new Updater(this, type->GetName(), obj->id), args); + } + } + + void OnNotify() anope_override + { + for (std::set<Serializable *>::iterator it = this->updated_items.begin(), it_end = this->updated_items.end(); it != it_end; ++it) + { + Serializable *s = *it; + + this->InsertObject(s); + } + + this->updated_items.clear(); + } + + void OnReload(Configuration::Conf *conf) anope_override + { + Configuration::Block *block = conf->GetModule(this); + this->redis = ServiceReference<Provider>("Redis::Provider", block->Get<const Anope::string>("engine", "redis/main")); + } + + EventReturn OnLoadDatabase() anope_override + { + const std::vector<Anope::string> type_order = Serialize::Type::GetTypeOrder(); + for (unsigned i = 0; i < type_order.size(); ++i) + { + Serialize::Type *sb = Serialize::Type::Find(type_order[i]); + this->OnSerializeTypeCreate(sb); + } + + while (redis->BlockAndProcess()); + + redis->Subscribe(&this->sl, "__keyspace@*__:hash:*"); + + return EVENT_STOP; + } + + void OnSerializeTypeCreate(Serialize::Type *sb) anope_override + { + if (!redis) + return; + + std::vector<Anope::string> args; + args.push_back("SMEMBERS"); + args.push_back("ids:" + sb->GetName()); + + redis->SendCommand(new TypeLoader(this, sb->GetName()), args); + } + + void OnSerializableConstruct(Serializable *obj) anope_override + { + this->updated_items.insert(obj); + this->Notify(); + } + + void OnSerializableDestruct(Serializable *obj) anope_override + { + Serialize::Type *type = obj->GetSerializableType(); + + std::vector<Anope::string> args; + args.push_back("HGETALL"); + args.push_back("hash:" + type->GetName() + ":" + stringify(obj->id)); + + /* Get all of the attributes for this object */ + redis->SendCommand(new Deleter(this, type->GetName(), obj->id), args); + + this->updated_items.erase(obj); + type->objects.erase(obj->id); + this->Notify(); + } + + void OnSerializableUpdate(Serializable *obj) anope_override + { + this->updated_items.insert(obj); + this->Notify(); + } +}; + +void TypeLoader::OnResult(const Reply &r) +{ + if (r.type != Reply::MULTI_BULK || !me->redis) + { + delete this; + return; + } + + for (unsigned i = 0; i < r.multi_bulk.size(); ++i) + { + const Reply &reply = r.multi_bulk[i]; + + if (reply.type != Reply::BULK) + continue; + + int64_t i; + try + { + i = convertTo<int64_t>(reply.bulk); + } + catch (const ConvertException &) + { + continue; + } + + std::vector<Anope::string> args; + args.push_back("HGETALL"); + args.push_back("hash:" + this->type + ":" + stringify(i)); + + me->redis->SendCommand(new ObjectLoader(me, this->type, i), args); + } + + delete this; +} + +void ObjectLoader::OnResult(const Reply &r) +{ + Serialize::Type *st = Serialize::Type::Find(this->type); + + if (r.type != Reply::MULTI_BULK || r.multi_bulk.empty() || !me->redis || !st) + { + delete this; + return; + } + + Data data; + + for (unsigned i = 0; i + 1 < r.multi_bulk.size(); i += 2) + { + const Reply &key = r.multi_bulk[i], + &value = r.multi_bulk[i + 1]; + + data[key.bulk] << value.bulk; + } + + Serializable* &obj = st->objects[this->id]; + obj = st->Unserialize(obj, data); + if (obj) + { + obj->id = this->id; + obj->UpdateCache(data); + } + + delete this; +} + +void IDInterface::OnResult(const Reply &r) +{ + if (!o || r.type != Reply::INT || !r.i) + { + delete this; + return; + } + + Serializable* &obj = o->GetSerializableType()->objects[r.i]; + if (obj) + /* This shouldn't be possible */ + obj->id = 0; + + o->id = r.i; + obj = o; + + /* Now that we have the id, insert this object for real */ + anope_dynamic_static_cast<DatabaseRedis *>(this->owner)->InsertObject(o); + + delete this; +} + +void Deleter::OnResult(const Reply &r) +{ + if (r.type != Reply::MULTI_BULK || !me->redis || r.multi_bulk.empty()) + { + delete this; + return; + } + + /* Transaction start */ + me->redis->StartTransaction(); + + std::vector<Anope::string> args; + args.push_back("DEL"); + args.push_back("hash:" + this->type + ":" + stringify(this->id)); + + /* Delete hash object */ + me->redis->SendCommand(NULL, args); + + args.clear(); + args.push_back("SREM"); + args.push_back("ids:" + this->type); + args.push_back(stringify(this->id)); + + /* Delete id from ids set */ + me->redis->SendCommand(NULL, args); + + for (unsigned i = 0; i + 1 < r.multi_bulk.size(); i += 2) + { + const Reply &key = r.multi_bulk[i], + &value = r.multi_bulk[i + 1]; + + args.clear(); + args.push_back("SREM"); + args.push_back("value:" + this->type + ":" + key.bulk + ":" + value.bulk); + args.push_back(stringify(this->id)); + + /* Delete value -> object id */ + me->redis->SendCommand(NULL, args); + } + + /* Transaction end */ + me->redis->CommitTransaction(); + + delete this; +} + +void Updater::OnResult(const Reply &r) +{ + Serialize::Type *st = Serialize::Type::Find(this->type); + + if (!st) + { + delete this; + return; + } + + Serializable *obj = st->objects[this->id]; + if (!obj) + { + delete this; + return; + } + + Data data; + obj->Serialize(data); + + /* Transaction start */ + me->redis->StartTransaction(); + + for (unsigned i = 0; i + 1 < r.multi_bulk.size(); i += 2) + { + const Reply &key = r.multi_bulk[i], + &value = r.multi_bulk[i + 1]; + + std::vector<Anope::string> args; + args.push_back("SREM"); + args.push_back("value:" + this->type + ":" + key.bulk + ":" + value.bulk); + args.push_back(stringify(this->id)); + + /* Delete value -> object id */ + me->redis->SendCommand(NULL, args); + } + + /* Add object id to id set for this type */ + std::vector<Anope::string> args; + args.push_back("SADD"); + args.push_back("ids:" + this->type); + args.push_back(stringify(obj->id)); + me->redis->SendCommand(NULL, args); + + args.clear(); + args.push_back("HMSET"); + args.push_back("hash:" + this->type + ":" + stringify(obj->id)); + + typedef std::map<Anope::string, std::stringstream *> items; + for (items::iterator it = data.data.begin(), it_end = data.data.end(); it != it_end; ++it) + { + const Anope::string &key = it->first; + std::stringstream *value = it->second; + + args.push_back(key); + args.push_back(value->str()); + + std::vector<Anope::string> args2; + + args2.push_back("SADD"); + args2.push_back("value:" + this->type + ":" + key + ":" + value->str()); + args2.push_back(stringify(obj->id)); + + /* Add to value -> object id set */ + me->redis->SendCommand(NULL, args2); + } + + ++obj->redis_ignore; + + /* Add object */ + me->redis->SendCommand(NULL, args); + + /* Transaction end */ + me->redis->CommitTransaction(); + + delete this; +} + +void SubscriptionListener::OnResult(const Reply &r) +{ + /* + * [May 15 13:59:35.645839 2013] Debug: pmessage + * [May 15 13:59:35.645866 2013] Debug: __keyspace@*__:anope:hash:* + * [May 15 13:59:35.645880 2013] Debug: __keyspace@0__:anope:hash:type:id + * [May 15 13:59:35.645893 2013] Debug: hset + */ + if (r.multi_bulk.size() != 4) + return; + + size_t sz = r.multi_bulk[2].bulk.find(':'); + if (sz == Anope::string::npos) + return; + + const Anope::string &key = r.multi_bulk[2].bulk.substr(sz + 1), + &op = r.multi_bulk[3].bulk; + + sz = key.rfind(':'); + if (sz == Anope::string::npos) + return; + + const Anope::string &id = key.substr(sz + 1); + + size_t sz2 = key.rfind(':', sz - 1); + if (sz2 == Anope::string::npos) + return; + const Anope::string &type = key.substr(sz2 + 1, sz - sz2 - 1); + + Serialize::Type *s_type = Serialize::Type::Find(type); + + if (s_type == NULL) + return; + + uint64_t obj_id; + try + { + obj_id = convertTo<uint64_t>(id); + } + catch (const ConvertException &) + { + return; + } + + if (op == "hset" || op == "hdel") + { + Serializable *s = s_type->objects[obj_id]; + + if (s && s->redis_ignore) + { + --s->redis_ignore; + Log(LOG_DEBUG) << "redis: notify: got modify for object id " << obj_id << " of type " << type << ", but I am ignoring it"; + } + else + { + Log(LOG_DEBUG) << "redis: notify: got modify for object id " << obj_id << " of type " << type; + + std::vector<Anope::string> args; + args.push_back("HGETALL"); + args.push_back("hash:" + type + ":" + id); + + me->redis->SendCommand(new ModifiedObject(me, type, obj_id), args); + } + } + else if (op == "del") + { + Serializable* &s = s_type->objects[obj_id]; + if (s == NULL) + return; + + Log(LOG_DEBUG) << "redis: notify: deleting object id " << obj_id << " of type " << type; + + Data data; + + s->Serialize(data); + + /* Transaction start */ + me->redis->StartTransaction(); + + typedef std::map<Anope::string, std::stringstream *> items; + for (items::iterator it = data.data.begin(), it_end = data.data.end(); it != it_end; ++it) + { + const Anope::string &key = it->first; + std::stringstream *value = it->second; + + std::vector<Anope::string> args; + args.push_back("SREM"); + args.push_back("value:" + type + ":" + key + ":" + value->str()); + args.push_back(id); + + /* Delete value -> object id */ + me->redis->SendCommand(NULL, args); + } + + std::vector<Anope::string> args; + args.push_back("SREM"); + args.push_back("ids:" + type); + args.push_back(stringify(s->id)); + + /* Delete object from id set */ + me->redis->SendCommand(NULL, args); + + /* Transaction end */ + me->redis->CommitTransaction(); + + delete s; + s = NULL; + } +} + +void ModifiedObject::OnResult(const Reply &r) +{ + Serialize::Type *st = Serialize::Type::Find(this->type); + + if (!st) + { + delete this; + return; + } + + Serializable* &obj = st->objects[this->id]; + + /* Transaction start */ + me->redis->StartTransaction(); + + /* Erase old object values */ + if (obj) + { + Data data; + + obj->Serialize(data); + + typedef std::map<Anope::string, std::stringstream *> items; + for (items::iterator it = data.data.begin(), it_end = data.data.end(); it != it_end; ++it) + { + const Anope::string &key = it->first; + std::stringstream *value = it->second; + + std::vector<Anope::string> args; + args.push_back("SREM"); + args.push_back("value:" + st->GetName() + ":" + key + ":" + value->str()); + args.push_back(stringify(this->id)); + + /* Delete value -> object id */ + me->redis->SendCommand(NULL, args); + } + } + + Data data; + + for (unsigned i = 0; i + 1 < r.multi_bulk.size(); i += 2) + { + const Reply &key = r.multi_bulk[i], + &value = r.multi_bulk[i + 1]; + + data[key.bulk] << value.bulk; + } + + obj = st->Unserialize(obj, data); + if (obj) + { + obj->id = this->id; + obj->UpdateCache(data); + + /* Insert new object values */ + typedef std::map<Anope::string, std::stringstream *> items; + for (items::iterator it = data.data.begin(), it_end = data.data.end(); it != it_end; ++it) + { + const Anope::string &key = it->first; + std::stringstream *value = it->second; + + std::vector<Anope::string> args; + args.push_back("SADD"); + args.push_back("value:" + st->GetName() + ":" + key + ":" + value->str()); + args.push_back(stringify(obj->id)); + + /* Add to value -> object id set */ + me->redis->SendCommand(NULL, args); + } + + std::vector<Anope::string> args; + args.push_back("SADD"); + args.push_back("ids:" + st->GetName()); + args.push_back(stringify(obj->id)); + + /* Add to type -> id set */ + me->redis->SendCommand(NULL, args); + } + + /* Transaction end */ + me->redis->CommitTransaction(); + + delete this; +} + +MODULE_INIT(DatabaseRedis) diff --git a/modules/database/db_sql_live.cpp b/modules/database/db_sql_live.cpp index 1a53f98d0..992a15471 100644 --- a/modules/database/db_sql_live.cpp +++ b/modules/database/db_sql_live.cpp @@ -188,7 +188,7 @@ class DBMySQL : public Module, public Pipe if (res.Get(i, "timestamp").empty()) { clear_null = true; - std::map<unsigned int, Serializable *>::iterator it = obj->objects.find(id); + std::map<uint64_t, Serializable *>::iterator it = obj->objects.find(id); if (it != obj->objects.end()) delete it->second; // This also removes this object from the map } @@ -200,7 +200,7 @@ class DBMySQL : public Module, public Pipe data[it->first] << it->second; Serializable *s = NULL; - std::map<unsigned int, Serializable *>::iterator it = obj->objects.find(id); + std::map<uint64_t, Serializable *>::iterator it = obj->objects.find(id); if (it != obj->objects.end()) s = it->second; diff --git a/modules/m_redis.cpp b/modules/m_redis.cpp new file mode 100644 index 000000000..76cb98d2e --- /dev/null +++ b/modules/m_redis.cpp @@ -0,0 +1,620 @@ +/* + * + * (C) 2003-2013 Anope Team + * Contact us at team@anope.org + * + * Please read COPYING and README for further details. + * + * + */ + +#include "module.h" +#include "modules/redis.h" + +using namespace Redis; + +class MyRedisService; + +class RedisSocket : public BinarySocket, public ConnectionSocket +{ + size_t ParseReply(Reply &r, const char *buf, size_t l); + public: + MyRedisService *provider; + std::deque<Interface *> interfaces; + std::map<Anope::string, Interface *> subinterfaces; + + RedisSocket(MyRedisService *pro, bool ipv6) : Socket(-1, ipv6), provider(pro) { } + + ~RedisSocket(); + + void OnConnect() anope_override; + void OnError(const Anope::string &error) anope_override; + + bool Read(const char *buffer, size_t l) anope_override; +}; + +class Transaction : public Interface +{ + public: + std::deque<Interface *> interfaces; + + Transaction(Module *creator) : Interface(creator) { } + + ~Transaction() + { + for (unsigned i = 0; i < interfaces.size(); ++i) + { + Interface *inter = interfaces[i]; + + if (!inter) + continue; + + inter->OnError("Interface going away"); + } + } + + void OnResult(const Reply &r) anope_override + { + /* This is a multi bulk reply of the results of the queued commands + * in this transaction + */ + + Log(LOG_DEBUG_2) << "redis: transaction complete with " << r.multi_bulk.size() << " results"; + + for (unsigned i = 0; i < r.multi_bulk.size(); ++i) + { + const Reply &reply = r.multi_bulk[i]; + + if (interfaces.empty()) + break; + + Interface *i = interfaces.front(); + interfaces.pop_front(); + + if (i) + i->OnResult(reply); + } + } +}; + +class MyRedisService : public Provider +{ + public: + Anope::string host; + int port; + unsigned db; + + RedisSocket *sock, *sub; + + Transaction ti; + bool in_transaction; + + MyRedisService(Module *c, const Anope::string &n, const Anope::string &h, int p, unsigned d) : Provider(c, n), host(h), port(p), db(d), sock(NULL), sub(NULL), + ti(c), in_transaction(false) + { + sock = new RedisSocket(this, host.find(':') != Anope::string::npos); + sock->Connect(host, port); + + sub = new RedisSocket(this, host.find(':') != Anope::string::npos); + sub->Connect(host, port); + } + + ~MyRedisService() + { + if (sock) + { + sock->flags[SF_DEAD] = true; + sock->provider = NULL; + } + + if (sub) + { + sub->flags[SF_DEAD] = true; + sub->provider = NULL; + } + } + + private: + inline void Pack(std::vector<char> &buffer, const char *buf, size_t sz = 0) + { + if (!sz) + sz = strlen(buf); + + size_t old_size = buffer.size(); + buffer.resize(old_size + sz); + std::copy(buf, buf + sz, buffer.begin() + old_size); + } + + void Send(RedisSocket *s, Interface *i, const std::vector<std::pair<const char *, size_t> > &args) + { + std::vector<char> buffer; + + Pack(buffer, "*"); + Pack(buffer, stringify(args.size()).c_str()); + Pack(buffer, "\r\n"); + + for (unsigned j = 0; j < args.size(); ++j) + { + const std::pair<const char *, size_t> &pair = args[j]; + + Pack(buffer, "$"); + Pack(buffer, stringify(pair.second).c_str()); + Pack(buffer, "\r\n"); + + Pack(buffer, pair.first, pair.second); + Pack(buffer, "\r\n"); + } + + if (buffer.empty()) + return; + + s->Write(&buffer[0], buffer.size()); + if (in_transaction) + { + ti.interfaces.push_back(i); + s->interfaces.push_back(NULL); // For the +Queued response + } + else + s->interfaces.push_back(i); + } + + public: + void SendCommand(RedisSocket *s, Interface *i, const std::vector<Anope::string> &cmds) + { + std::vector<std::pair<const char *, size_t> > args; + for (unsigned j = 0; j < cmds.size(); ++j) + args.push_back(std::make_pair(cmds[j].c_str(), cmds[j].length())); + this->Send(s, i, args); + } + + void SendCommand(RedisSocket *s, Interface *i, const Anope::string &str) + { + std::vector<Anope::string> args; + spacesepstream(str).GetTokens(args); + this->SendCommand(s, i, args); + } + + void Send(Interface *i, const std::vector<std::pair<const char *, size_t> > &args) anope_override + { + if (!sock) + { + sock = new RedisSocket(this, host.find(':') != Anope::string::npos); + sock->Connect(host, port); + } + + this->Send(sock, i, args); + } + + void SendCommand(Interface *i, const std::vector<Anope::string> &cmds) anope_override + { + std::vector<std::pair<const char *, size_t> > args; + for (unsigned j = 0; j < cmds.size(); ++j) + args.push_back(std::make_pair(cmds[j].c_str(), cmds[j].length())); + this->Send(i, args); + } + + void SendCommand(Interface *i, const Anope::string &str) anope_override + { + std::vector<Anope::string> args; + spacesepstream(str).GetTokens(args); + this->SendCommand(i, args); + } + + bool BlockAndProcess() anope_override + { + this->sock->ProcessWrite(); + this->sock->SetBlocking(true); + this->sock->ProcessRead(); + this->sock->SetBlocking(false); + return !this->sock->interfaces.empty(); + } + + void Subscribe(Interface *i, const Anope::string &pattern) anope_override + { + if (sub == NULL) + { + sub = new RedisSocket(this, host.find(':') != Anope::string::npos); + sub->Connect(host, port); + } + + std::vector<Anope::string> args; + args.push_back("PSUBSCRIBE"); + args.push_back(pattern); + this->SendCommand(sub, NULL, args); + + sub->subinterfaces[pattern] = i; + } + + void Unsubscribe(const Anope::string &pattern) anope_override + { + if (sub) + sub->subinterfaces.erase(pattern); + } + + void StartTransaction() anope_override + { + if (in_transaction) + throw CoreException(); + + this->SendCommand(NULL, "MULTI"); + in_transaction = true; + } + + void CommitTransaction() anope_override + { + /* The result of the transaction comes back to the reply of EXEC as a multi bulk. + * The reply to the individual commands that make up the transaction when executed + * is a simple +QUEUED + */ + in_transaction = false; + this->SendCommand(&this->ti, "EXEC"); + } +}; + +RedisSocket::~RedisSocket() +{ + if (provider) + { + if (provider->sock == this) + provider->sock = NULL; + else if (provider->sub == this) + provider->sub = NULL; + } + + for (unsigned i = 0; i < interfaces.size(); ++i) + { + Interface *inter = interfaces[i]; + + if (!inter) + continue; + + inter->OnError("Interface going away"); + } +} + +void RedisSocket::OnConnect() +{ + Log() << "redis: Successfully connected to " << provider->name << (this == this->provider->sub ? " (sub)" : ""); + + this->provider->SendCommand(NULL, "CLIENT SETNAME Anope"); + this->provider->SendCommand(NULL, "SELECT " + stringify(provider->db)); + + if (this != this->provider->sub) + { + this->provider->SendCommand(this, NULL, "CONFIG SET notify-keyspace-events KA"); + } +} + +void RedisSocket::OnError(const Anope::string &error) +{ + Log() << "redis: Error on " << provider->name << (this == this->provider->sub ? " (sub)" : "") << ": " << error; +} + +size_t RedisSocket::ParseReply(Reply &r, const char *buffer, size_t l) +{ + size_t used = 0; + + if (!l) + return used; + + if (r.type == Reply::MULTI_BULK) + goto multi_bulk_cont; + + switch (*buffer) + { + case '+': + { + Anope::string reason(buffer, 1, l - 1); + size_t nl = reason.find("\r\n"); + Log(LOG_DEBUG_2) << "redis: status ok: " << reason.substr(0, nl); + if (nl != Anope::string::npos) + { + r.type = Reply::OK; + used = 1 + nl + 2; + } + break; + } + case '-': + { + Anope::string reason(buffer, 1, l - 1); + size_t nl = reason.find("\r\n"); + Log(LOG_DEBUG) << "redis: status error: " << reason.substr(0, nl); + if (nl != Anope::string::npos) + { + r.type = Reply::ERROR; + used = 1 + nl + 2; + } + break; + } + case ':': + { + Anope::string ibuf(buffer, 1, l - 1); + size_t nl = ibuf.find("\r\n"); + if (nl != Anope::string::npos) + { + try + { + r.i = convertTo<int64_t>(ibuf.substr(0, nl)); + } + catch (const ConvertException &) { } + + r.type = Reply::INT; + used = 1 + nl + 2; + } + break; + } + case '$': + { + Anope::string reply(buffer + 1, l - 1); + /* This assumes one bulk can always fit in our recv buffer */ + size_t nl = reply.find("\r\n"); + if (nl != Anope::string::npos) + { + int len; + try + { + len = convertTo<int>(reply.substr(0, nl)); + if (len >= 0) + { + if (1 + nl + 2 + len + 2 <= l) + { + used = 1 + nl + 2 + len + 2; + r.bulk = reply.substr(nl + 2, len); + r.type = Reply::BULK; + } + } + else + { + used = 1 + nl + 2 + 2; + r.type = Reply::BULK; + } + } + catch (const ConvertException &) { } + } + break; + } + multi_bulk_cont: + case '*': + { + if (r.type != Reply::MULTI_BULK) + { + Anope::string reply(buffer + 1, l - 1); + size_t nl = reply.find("\r\n"); + if (nl != Anope::string::npos) + { + r.type = Reply::MULTI_BULK; + try + { + r.multi_bulk_size = convertTo<int>(reply.substr(0, nl)); + } + catch (const ConvertException &) { } + + used = 1 + nl + 2; + } + else + break; + } + else if (r.multi_bulk.size() == r.multi_bulk_size) + { + /* This multi bulk is already complete, so check the sub bulks */ + for (unsigned i = 0; i < r.multi_bulk.size(); ++i) + if (r.multi_bulk[i].type == Reply::MULTI_BULK) + ParseReply(r.multi_bulk[i], buffer + used, l - used); + break; + } + + for (int i = r.multi_bulk.size(); i < r.multi_bulk_size; ++i) + { + r.multi_bulk.push_back(Reply()); + size_t u = ParseReply(r.multi_bulk.back(), buffer + used, l - used); + if (!u) + { + Log(LOG_DEBUG) << "redis: ran out of data to parse"; + r.multi_bulk.pop_back(); + break; + } + used += u; + } + break; + } + default: + Log(LOG_DEBUG) << "redis: unknown reply " << *buffer; + } + + return used; +} + +bool RedisSocket::Read(const char *buffer, size_t l) +{ + static std::vector<char> save; + std::vector<char> copy; + + if (!save.empty()) + { + std::copy(buffer, buffer + l, std::back_inserter(save)); + + copy = save; + + buffer = ©[0]; + l = copy.size(); + } + + while (l) + { + static Reply r; + + size_t used = this->ParseReply(r, buffer, l); + if (!used) + { + Log(LOG_DEBUG) << "redis: used == 0 ?"; + r.Clear(); + break; + } + else if (used > l) + { + Log(LOG_DEBUG) << "redis: used > l ?"; + r.Clear(); + break; + } + + /* Full result is not here yet */ + if (r.type == Reply::MULTI_BULK && static_cast<unsigned>(r.multi_bulk_size) != r.multi_bulk.size()) + { + buffer += used; + l -= used; + break; + } + + if (this == provider->sub) + { + if (r.multi_bulk.size() == 4) + { + /* pmessage + * pattern subscribed to + * __keyevent@0__:set + * key + */ + std::map<Anope::string, Interface *>::iterator it = this->subinterfaces.find(r.multi_bulk[1].bulk); + if (it != this->subinterfaces.end()) + it->second->OnResult(r); + } + } + else + { + if (this->interfaces.empty()) + { + Log(LOG_DEBUG) << "redis: no interfaces?"; + } + else + { + Interface *i = this->interfaces.front(); + this->interfaces.pop_front(); + + if (i) + { + if (r.type != Reply::ERROR) + i->OnResult(r); + else + i->OnError(r.bulk); + } + } + } + + buffer += used; + l -= used; + + r.Clear(); + } + + if (l) + { + save.resize(l); + std::copy(buffer, buffer + l, save.begin()); + } + else + std::vector<char>().swap(save); + + return true; +} + + +class ModuleRedis : public Module +{ + std::map<Anope::string, MyRedisService *> services; + + public: + ModuleRedis(const Anope::string &modname, const Anope::string &creator) : Module(modname, creator, EXTRA | VENDOR) + { + Implementation i[] = { I_OnReload, I_OnModuleUnload }; + ModuleManager::Attach(i, this, sizeof(i) / sizeof(Implementation)); + } + + ~ModuleRedis() + { + for (std::map<Anope::string, MyRedisService *>::iterator it = services.begin(); it != services.end(); ++it) + { + MyRedisService *p = it->second; + + delete p->sock; + p->sock = NULL; + delete p->sub; + p->sub = NULL; + + delete p; + } + } + + void OnReload(Configuration::Conf *conf) anope_override + { + Configuration::Block *block = conf->GetModule(this); + std::vector<Anope::string> new_services; + + for (int i = 0; i < block->CountBlock("redis"); ++i) + { + Configuration::Block *redis = block->GetBlock("redis", i); + + const Anope::string &name = redis->Get<const Anope::string>("name"), + &ip = redis->Get<const Anope::string>("ip"); + int port = redis->Get<int>("port"); + unsigned db = redis->Get<unsigned>("db"); + + delete services[name]; + services[name] = new MyRedisService(this, name, ip, port, db); + new_services.push_back(name); + } + + for (std::map<Anope::string, MyRedisService *>::iterator it = services.begin(); it != services.end();) + { + Provider *p = it->second; + ++it; + + if (std::find(new_services.begin(), new_services.end(), p->name) == new_services.end()) + delete it->second; + } + } + + void OnModuleUnload(User *, Module *m) anope_override + { + for (std::map<Anope::string, MyRedisService *>::iterator it = services.begin(); it != services.end(); ++it) + { + MyRedisService *p = it->second; + + if (p->sock) + for (unsigned i = p->sock->interfaces.size(); i > 0; --i) + { + Interface *inter = p->sock->interfaces[i - 1]; + + if (inter && inter->owner == m) + { + inter->OnError(m->name + " being unloaded"); + p->sock->interfaces.erase(p->sock->interfaces.begin() + i - 1); + } + } + + if (p->sub) + for (unsigned i = p->sub->interfaces.size(); i > 0; --i) + { + Interface *inter = p->sub->interfaces[i - 1]; + + if (inter && inter->owner == m) + { + inter->OnError(m->name + " being unloaded"); + p->sub->interfaces.erase(p->sub->interfaces.begin() + i - 1); + } + } + + for (unsigned i = p->ti.interfaces.size(); i > 0; --i) + { + Interface *inter = p->ti.interfaces[i - 1]; + + if (inter && inter->owner == m) + { + inter->OnError(m->name + " being unloaded"); + p->ti.interfaces.erase(p->ti.interfaces.begin() + i - 1); + } + } + } + } +}; + +MODULE_INIT(ModuleRedis) diff --git a/src/serialize.cpp b/src/serialize.cpp index 444cf79da..bb95cdc41 100644 --- a/src/serialize.cpp +++ b/src/serialize.cpp @@ -44,7 +44,7 @@ void Serialize::CheckTypes() } } -Serializable::Serializable(const Anope::string &serialize_type) : last_commit(0), last_commit_time(0), id(0) +Serializable::Serializable(const Anope::string &serialize_type) : last_commit(0), last_commit_time(0), id(0), redis_ignore(0) { if (SerializableItems == NULL) SerializableItems = new std::list<Serializable *>(); @@ -58,7 +58,7 @@ Serializable::Serializable(const Anope::string &serialize_type) : last_commit(0) FOREACH_MOD(I_OnSerializableConstruct, OnSerializableConstruct(this)); } -Serializable::Serializable(const Serializable &other) : last_commit(0), last_commit_time(0), id(0) +Serializable::Serializable(const Serializable &other) : last_commit(0), last_commit_time(0), id(0), redis_ignore(0) { SerializableItems->push_back(this); this->s_iter = SerializableItems->end(); diff --git a/src/socket_clients.cpp b/src/socket_clients.cpp index 3686b2c0a..7893a07a9 100644 --- a/src/socket_clients.cpp +++ b/src/socket_clients.cpp @@ -17,10 +17,6 @@ #include <errno.h> -ConnectionSocket::ConnectionSocket() : Socket() -{ -} - void ConnectionSocket::Connect(const Anope::string &TargetHost, int Port) { this->io->Connect(this, TargetHost, Port); |