summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAdam <Adam@anope.org>2013-05-17 23:04:18 -0400
committerAdam <Adam@anope.org>2013-05-17 23:04:18 -0400
commit2428264315868f0860f9747c8b005536e5442db6 (patch)
tree9cd3070f46823ea5f5af32abf84d3c39975e9634
parentcc4a14b0badfe3d617ec2dd230d7921f8650a069 (diff)
Add Redis database support
-rw-r--r--data/example.conf30
-rw-r--r--data/modules.example.conf25
-rw-r--r--docs/REDIS160
-rw-r--r--include/modules.h2
-rw-r--r--include/modules/redis.h62
-rw-r--r--include/serialize.h7
-rw-r--r--include/sockets.h4
-rw-r--r--modules/database/db_redis.cpp648
-rw-r--r--modules/database/db_sql_live.cpp4
-rw-r--r--modules/m_redis.cpp620
-rw-r--r--src/serialize.cpp4
-rw-r--r--src/socket_clients.cpp4
12 files changed, 1553 insertions, 17 deletions
diff --git a/data/example.conf b/data/example.conf
index f1433b7d1..aea4c07f3 100644
--- a/data/example.conf
+++ b/data/example.conf
@@ -22,7 +22,7 @@
* }
*
* Note that nameless blocks are allowed and are often used with comments to allow
- * easialy commenting an entire block, for example:
+ * easily commenting an entire block, for example:
* #foobar
* {
* moo = "cow"
@@ -1082,7 +1082,7 @@ mail
#module { name = "db_plain" }
/*
- * db_flatfile
+ * [RECOMMENDED] db_flatfile
*
* This is the default flatfile database format.
*/
@@ -1124,6 +1124,11 @@ module
* This is only useful with very large databases, with hundreds
* of thousands of objects, that have a noticeable delay from
* writing databases.
+ *
+ * If your database is large enough cause a noticible delay when
+ * saving you should consider a more powerful alternative such
+ * as db_sql or db_redis, which incrementally update their
+ * databases asynchronously in real time.
*/
fork = no
}
@@ -1173,6 +1178,27 @@ module
}
/*
+ * db_redis.
+ *
+ * This module allows using Redis (http://redis.io) as a database backend.
+ * This module requires that m_redis is loaded and configured properly.
+ *
+ * Redis 2.8 supports keyspace notifications which allows Redis to push notifications
+ * to Anope about outside modifications to the database. This module supports this and
+ * will internally reflect any changes made to the database immediately once notified.
+ * See docs/REDIS for more information regarding this.
+ */
+#module
+{
+ name = "db_redis"
+
+ /*
+ * Redis database to use. This must be configured with m_redis.
+ */
+ engine = "redis/main"
+}
+
+/*
* [RECOMMENDED] Encryption modules.
*
* The encryption modules are used when dealing with passwords. This determines how
diff --git a/data/modules.example.conf b/data/modules.example.conf
index 01730ce29..2129d3ac9 100644
--- a/data/modules.example.conf
+++ b/data/modules.example.conf
@@ -504,6 +504,31 @@ module { name = "help" }
}
/*
+ * m_redis
+ *
+ * This module allows other modules to use Redis.
+ */
+#module
+{
+ name = "m_redis"
+
+ /* A redis database */
+ redis
+ {
+ /* The name of this service */
+ name = "redis/main"
+
+ /*
+ * The redis database to use. New connections default to 0.
+ */
+ db = 0
+
+ ip = "127.0.0.1"
+ port = 6379
+ }
+}
+
+/*
* m_regex_pcre
*
* Provides the regex engine regex/pcre, which uses the Perl Compatible Regular Expressions library.
diff --git a/docs/REDIS b/docs/REDIS
new file mode 100644
index 000000000..8c40d5690
--- /dev/null
+++ b/docs/REDIS
@@ -0,0 +1,160 @@
+Starting in Anope 1.9.9, Anope has Redis database support (http://redis.io/).
+This document explains the data structure used by Anope, and explains how
+keyspace notification works.
+
+This is not a tutorial on how to use Redis, see http://redis.io/documentation
+for that.
+
+Table of Contents
+-----------------
+1) Data structure
+2) Keyspace notifications
+3) Examples of modfying, deleting, and creating objects
+
+1) Data structure
+
+ There are 4 key namespaces in Anope, they are:
+
+ id - The keys in id are used to atomically create object ids for new
+ objects. For example, if I were to create a new BotInfo I would first:
+
+ redis 127.0.0.1:6379> INCR id:BotInfo
+
+ To get the object ID of the new object.
+
+ ids - The keys in ids contain a set of all object ids of the given type.
+ For example:
+
+ redis 127.0.0.1:6379> SMEMBERS ids:BotInfo
+
+ Returns "1", "2", "3", "4", "5", "6", "7", "8" because I have 8 bots that
+ have IDs 1, 2, 3, 4, 5, 6, 7, and 8, respectively.
+
+ hash - The keys in hash are the actual objects, stored as hashes. For
+ example, if I had just looked up all BotInfo ids and wanted to iterate
+ over all of them, I woulld start by:
+
+ redis 127.0.0.1:6379> HGETALL hash:BotInfo:1
+
+ Which gets all keys and values from the hash of type BotInfo with id 1.
+ This may return:
+
+ "nick" -> "BotServ"
+ "user" -> "services"
+ "host" -> "services.anope.org"
+ "created" -> "1368704765"
+
+ value - The keys in value only exist to aid looking up object IDs. They
+ are sets of object IDs and are used to map key+value pairs to objects.
+ For example:
+
+ redis 127.0.0.1:6379> SMEMBERS value:NickAlias:nick:Adam
+
+ Returns a set of object ids of NickAlias objects that have the key
+ 'nick' set to the value 'Adam' in its hash. Clearly this can only
+ ever contain at most one object, since it is not possible to have
+ more than one registered nick with the same name, but other keys
+ will contain more than one, such as:
+
+ redis 127.0.0.1:6379> SMEMBERS value:NickCore:email:adam@anope.org
+
+ Which would return all accounts with the email "adam@anope.org".
+
+ redis 127.0.0.1:6379> SMEMBERS value:ChanAccess:mask:Adam
+
+ Which would return all access entries set on the account "Adam".
+
+ Behavior similar to SQL's AND, can be achieved using the
+ SINTER command, which does set intersection on one or more sets.
+
+2) Keyspace notifications
+
+ Redis 2.7 (unstable) and 2.8 (stable) and newer support keyspace notifications
+ (http://redis.io/topics/notifications). This allows Redis to notify Anope of
+ any external changes to objects in the database. Once notified, Anope will
+ immediately update the object. Otherwise, Anope keeps all objects in memory
+ and will not regularly read from the databaes once started.
+
+ You can use this to modify objects in Redis and have them immediately reflected
+ back into Anope. Additionally you can use this feature to run multiple Anope
+ instances simultaneously from the same database (see also, Redis database
+ replication).
+
+ To use keyspace notifications you MUST execute
+
+ redis 127.0.0.1:6379> CONFIG SET notify-keyspace-events KA
+ OK
+
+ or set notify-keyspace-events in redis.conf properly. Anope always executes
+ CONFIG SET when it first connects.
+
+ If you do not enable keyspace events properly Anope will be UNABLE to see any
+ object modifications you do.
+
+ The key space ids and value are managed entirely by Anope, you do
+ not (and should not) modify them. Once you modify the object (hash), Anope will
+ update them for you to correctly refelect any changes made to the object.
+
+ Finally, always use atomic operations. If you are inserting a new object with
+ multiple commands, or inserting multiple objects at once, specifically if the
+ objects depend on each other, you MUST use a transaction.
+
+3) Examples of modfying, deleting, and creating objects
+
+ These examples will ONLY work if you meet the criteria in section 2.
+
+ If I want to change the email account 'Adam' to 'Adam@anope.org', I would execute the following:
+
+ redis 127.0.0.1:6379> SMEMBERS value:NickCore:display:Adam
+
+ Which returns a value of "1", which is the object id I want to modify.
+ Now to change the email:
+
+ redis 127.0.0.1:6379> HSET hash:NickCore:1 email Adam@anope.org
+
+ You can now see this in NickServ's INFO command:
+ -NickServ- Email address: Adam@anope.org
+
+ If I want to drop the account "Adam", I would execute the following:
+
+ redis 127.0.0.1:6379> SMEMBERS value:NickCore:display:Adam
+
+ Which returns a value of "1". I would then check:
+
+ redis 127.0.0.1:6379> SMEMBERS value:NickAlias:nc:Adam
+
+ To see what nicknames depend on this account to exist, as I will
+ have to remove those too. This returns the values "2", and "3".
+
+ Finally, I can drop the nick using a transaction via:
+
+ redis 127.0.0.1:6379> MULTI
+ OK
+ redis 127.0.0.1:6379> DEL hash:NickAlias:2
+ QUEUED
+ redis 127.0.0.1:6379> DEL hash:NickAlias:3
+ QUEUED
+ redis 127.0.0.1:6379> DEL hash:NickCore:1
+ QUEUED
+ redis 127.0.0.1:6379> EXEC
+
+ Or alternatively simply:
+
+ redis 127.0.0.1:6379> DEL hash:NickAlias:2 hash:NickAlias:3 hash:NickCore:1
+
+ If I wanted to create a BotServ bot, I would execute the following:
+
+ redis 127.0.0.1:6379> INCR id:BotInfo
+
+ Which returns a new object ID for me, in this example it will be "8".
+ Now I can create the object:
+
+ HMSET hash:BotInfo:8 nick redis user redis host services.anope.org realname "Services for IRC Networks"
+
+ Note if you are using HSET instead of HMSET you will need to use a transaction, as shown in the above example.
+ If you are watching your services logs you will immediatly see:
+
+ USERS: redis!redis@services.anope.org (Services for IRC Networks) connected to the network (services.anope.org)
+
+ And the bot redis will be in BotServ's bot list.
+ Notice how ids:BotInfo and the value keys are updated automatically.
diff --git a/include/modules.h b/include/modules.h
index 5585e9e77..31b09c2cf 100644
--- a/include/modules.h
+++ b/include/modules.h
@@ -368,7 +368,7 @@ class CoreExport Module : public Extensible
virtual EventReturn OnSaveDatabase() { return EVENT_CONTINUE; }
/** Called when the databases are loaded
- * @return EVENT_CONTINUE to let other modules continue saving, EVENT_STOP to stop
+ * @return EVENT_CONTINUE to let other modules continue loading, EVENT_STOP to stop
*/
virtual EventReturn OnLoadDatabase() { return EVENT_CONTINUE; }
diff --git a/include/modules/redis.h b/include/modules/redis.h
new file mode 100644
index 000000000..4b321a608
--- /dev/null
+++ b/include/modules/redis.h
@@ -0,0 +1,62 @@
+/*
+ *
+ * (C) 2003-2013 Anope Team
+ * Contact us at team@anope.org
+ *
+ * Please read COPYING and README for further details.
+ *
+ */
+
+namespace Redis
+{
+ struct Reply
+ {
+ enum Type
+ {
+ NOT_PARSED,
+ ERROR,
+ OK,
+ INT,
+ BULK,
+ MULTI_BULK
+ }
+ type;
+
+ Reply() { Clear(); }
+ void Clear() { type = NOT_PARSED; i = 0; bulk.clear(); multi_bulk_size = 0; multi_bulk.clear(); }
+
+ int64_t i;
+ Anope::string bulk;
+ int multi_bulk_size;
+ std::deque<Reply> multi_bulk;
+ };
+
+ class Interface
+ {
+ public:
+ Module *owner;
+
+ Interface(Module *m) : owner(m) { }
+
+ virtual void OnResult(const Reply &r) = 0;
+ virtual void OnError(const Anope::string &error) { Log(owner) << error; }
+ };
+
+ class Provider : public Service
+ {
+ public:
+ Provider(Module *c, const Anope::string &n) : Service(c, "Redis::Provider", n) { }
+
+ virtual void SendCommand(Interface *i, const std::vector<Anope::string> &cmds) = 0;
+ virtual void SendCommand(Interface *i, const Anope::string &str) = 0;
+
+ virtual bool BlockAndProcess() = 0;
+
+ virtual void Subscribe(Interface *i, const Anope::string &pattern) = 0;
+ virtual void Unsubscribe(const Anope::string &pattern) = 0;
+
+ virtual void StartTransaction() = 0;
+ virtual void CommitTransaction() = 0;
+ };
+}
+
diff --git a/include/serialize.h b/include/serialize.h
index a43046b1e..472701932 100644
--- a/include/serialize.h
+++ b/include/serialize.h
@@ -80,7 +80,10 @@ class CoreExport Serializable : public virtual Base
virtual ~Serializable();
/* Unique ID (per type, not globally) for this object */
- unsigned int id;
+ uint64_t id;
+
+ /* Only used by redis, to ignore updates */
+ unsigned short redis_ignore;
/** Marks the object as potentially being updated "soon".
*/
@@ -129,7 +132,7 @@ class CoreExport Serialize::Type
public:
/* Map of Serializable::id to Serializable objects */
- std::map<unsigned int, Serializable *> objects;
+ std::map<uint64_t, Serializable *> objects;
/** Creates a new serializable type
* @param n Type name
diff --git a/include/sockets.h b/include/sockets.h
index 8b1abe5c2..8b7b78a7f 100644
--- a/include/sockets.h
+++ b/include/sockets.h
@@ -388,10 +388,6 @@ class CoreExport ConnectionSocket : public virtual Socket
/* Sockaddrs for connection ip/port */
sockaddrs conaddr;
- /** Constructor
- */
- ConnectionSocket();
-
/** Connect the socket
* @param TargetHost The target host to connect to
* @param Port The target port to connect to
diff --git a/modules/database/db_redis.cpp b/modules/database/db_redis.cpp
new file mode 100644
index 000000000..03c7dabca
--- /dev/null
+++ b/modules/database/db_redis.cpp
@@ -0,0 +1,648 @@
+/*
+ *
+ * (C) 2003-2013 Anope Team
+ * Contact us at team@anope.org
+ *
+ * Please read COPYING and README for further details.
+ *
+ *
+ */
+
+#include "module.h"
+#include "modules/redis.h"
+
+using namespace Redis;
+
+class DatabaseRedis;
+static DatabaseRedis *me;
+
+class Data : public Serialize::Data
+{
+ public:
+ std::map<Anope::string, std::stringstream *> data;
+
+ ~Data()
+ {
+ for (std::map<Anope::string, std::stringstream *>::iterator it = data.begin(), it_end = data.end(); it != it_end; ++it)
+ delete it->second;
+ }
+
+ std::iostream& operator[](const Anope::string &key) anope_override
+ {
+ std::stringstream* &stream = data[key];
+ if (!stream)
+ stream = new std::stringstream();
+ return *stream;
+ }
+
+ std::set<Anope::string> KeySet() const anope_override
+ {
+ std::set<Anope::string> keys;
+ for (std::map<Anope::string, std::stringstream *>::const_iterator it = this->data.begin(), it_end = this->data.end(); it != it_end; ++it)
+ keys.insert(it->first);
+ return keys;
+ }
+
+ size_t Hash() const anope_override
+ {
+ size_t hash = 0;
+ for (std::map<Anope::string, std::stringstream *>::const_iterator it = this->data.begin(), it_end = this->data.end(); it != it_end; ++it)
+ if (!it->second->str().empty())
+ hash ^= Anope::hash_cs()(it->second->str());
+ return hash;
+ }
+};
+
+class TypeLoader : public Interface
+{
+ Anope::string type;
+ public:
+ TypeLoader(Module *creator, const Anope::string &t) : Interface(creator), type(t) { }
+
+ void OnResult(const Reply &r) anope_override;
+};
+
+class ObjectLoader : public Interface
+{
+ Anope::string type;
+ int64_t id;
+
+ public:
+ ObjectLoader(Module *creator, const Anope::string &t, int64_t i) : Interface(creator), type(t), id(i) { }
+
+ void OnResult(const Reply &r) anope_override;
+};
+
+class IDInterface : public Interface
+{
+ Reference<Serializable> o;
+ public:
+ IDInterface(Module *creator, Serializable *obj) : Interface(creator), o(obj) { }
+
+ void OnResult(const Reply &r) anope_override;
+};
+
+class Deleter : public Interface
+{
+ Anope::string type;
+ int64_t id;
+ public:
+ Deleter(Module *creator, const Anope::string &t, int64_t i) : Interface(creator), type(t), id(i) { }
+
+ void OnResult(const Reply &r) anope_override;
+};
+
+class Updater : public Interface
+{
+ Anope::string type;
+ int64_t id;
+ public:
+ Updater(Module *creator, const Anope::string &t, int64_t i) : Interface(creator), type(t), id(i) { }
+
+ void OnResult(const Reply &r) anope_override;
+};
+
+class ModifiedObject : public Interface
+{
+ Anope::string type;
+ int64_t id;
+ public:
+ ModifiedObject(Module *creator, const Anope::string &t, int64_t i) : Interface(creator), type(t), id(i) { }
+
+ void OnResult(const Reply &r) anope_override;
+};
+
+class SubscriptionListener : public Interface
+{
+ public:
+ SubscriptionListener(Module *creator) : Interface(creator) { }
+
+ void OnResult(const Reply &r) anope_override;
+};
+
+class DatabaseRedis : public Module, public Pipe
+{
+ SubscriptionListener sl;
+ std::set<Serializable *> updated_items;
+
+ public:
+ ServiceReference<Provider> redis;
+
+ DatabaseRedis(const Anope::string &modname, const Anope::string &creator) : Module(modname, creator, DATABASE | VENDOR), sl(this)
+ {
+ me = this;
+
+ Implementation i[] = { I_OnReload, I_OnLoadDatabase, I_OnSerializeTypeCreate, I_OnSerializableConstruct, I_OnSerializableDestruct, I_OnSerializableUpdate };
+ ModuleManager::Attach(i, this, sizeof(i) / sizeof(Implementation));
+ }
+
+ /* Insert or update an object */
+ void InsertObject(Serializable *obj)
+ {
+ Serialize::Type *type = obj->GetSerializableType();
+
+ /* If there is no id yet for ths object, get one */
+ if (!obj->id)
+ redis->SendCommand(new IDInterface(this, obj), "INCR id:" + type->GetName());
+ else
+ {
+ Data data;
+ obj->Serialize(data);
+
+ if (obj->IsCached(data))
+ return;
+
+ obj->UpdateCache(data);
+
+ std::vector<Anope::string> args;
+ args.push_back("HGETALL");
+ args.push_back("hash:" + type->GetName() + ":" + stringify(obj->id));
+
+ /* Get object attrs to clear before updating */
+ redis->SendCommand(new Updater(this, type->GetName(), obj->id), args);
+ }
+ }
+
+ void OnNotify() anope_override
+ {
+ for (std::set<Serializable *>::iterator it = this->updated_items.begin(), it_end = this->updated_items.end(); it != it_end; ++it)
+ {
+ Serializable *s = *it;
+
+ this->InsertObject(s);
+ }
+
+ this->updated_items.clear();
+ }
+
+ void OnReload(Configuration::Conf *conf) anope_override
+ {
+ Configuration::Block *block = conf->GetModule(this);
+ this->redis = ServiceReference<Provider>("Redis::Provider", block->Get<const Anope::string>("engine", "redis/main"));
+ }
+
+ EventReturn OnLoadDatabase() anope_override
+ {
+ const std::vector<Anope::string> type_order = Serialize::Type::GetTypeOrder();
+ for (unsigned i = 0; i < type_order.size(); ++i)
+ {
+ Serialize::Type *sb = Serialize::Type::Find(type_order[i]);
+ this->OnSerializeTypeCreate(sb);
+ }
+
+ while (redis->BlockAndProcess());
+
+ redis->Subscribe(&this->sl, "__keyspace@*__:hash:*");
+
+ return EVENT_STOP;
+ }
+
+ void OnSerializeTypeCreate(Serialize::Type *sb) anope_override
+ {
+ if (!redis)
+ return;
+
+ std::vector<Anope::string> args;
+ args.push_back("SMEMBERS");
+ args.push_back("ids:" + sb->GetName());
+
+ redis->SendCommand(new TypeLoader(this, sb->GetName()), args);
+ }
+
+ void OnSerializableConstruct(Serializable *obj) anope_override
+ {
+ this->updated_items.insert(obj);
+ this->Notify();
+ }
+
+ void OnSerializableDestruct(Serializable *obj) anope_override
+ {
+ Serialize::Type *type = obj->GetSerializableType();
+
+ std::vector<Anope::string> args;
+ args.push_back("HGETALL");
+ args.push_back("hash:" + type->GetName() + ":" + stringify(obj->id));
+
+ /* Get all of the attributes for this object */
+ redis->SendCommand(new Deleter(this, type->GetName(), obj->id), args);
+
+ this->updated_items.erase(obj);
+ type->objects.erase(obj->id);
+ this->Notify();
+ }
+
+ void OnSerializableUpdate(Serializable *obj) anope_override
+ {
+ this->updated_items.insert(obj);
+ this->Notify();
+ }
+};
+
+void TypeLoader::OnResult(const Reply &r)
+{
+ if (r.type != Reply::MULTI_BULK || !me->redis)
+ {
+ delete this;
+ return;
+ }
+
+ for (unsigned i = 0; i < r.multi_bulk.size(); ++i)
+ {
+ const Reply &reply = r.multi_bulk[i];
+
+ if (reply.type != Reply::BULK)
+ continue;
+
+ int64_t i;
+ try
+ {
+ i = convertTo<int64_t>(reply.bulk);
+ }
+ catch (const ConvertException &)
+ {
+ continue;
+ }
+
+ std::vector<Anope::string> args;
+ args.push_back("HGETALL");
+ args.push_back("hash:" + this->type + ":" + stringify(i));
+
+ me->redis->SendCommand(new ObjectLoader(me, this->type, i), args);
+ }
+
+ delete this;
+}
+
+void ObjectLoader::OnResult(const Reply &r)
+{
+ Serialize::Type *st = Serialize::Type::Find(this->type);
+
+ if (r.type != Reply::MULTI_BULK || r.multi_bulk.empty() || !me->redis || !st)
+ {
+ delete this;
+ return;
+ }
+
+ Data data;
+
+ for (unsigned i = 0; i + 1 < r.multi_bulk.size(); i += 2)
+ {
+ const Reply &key = r.multi_bulk[i],
+ &value = r.multi_bulk[i + 1];
+
+ data[key.bulk] << value.bulk;
+ }
+
+ Serializable* &obj = st->objects[this->id];
+ obj = st->Unserialize(obj, data);
+ if (obj)
+ {
+ obj->id = this->id;
+ obj->UpdateCache(data);
+ }
+
+ delete this;
+}
+
+void IDInterface::OnResult(const Reply &r)
+{
+ if (!o || r.type != Reply::INT || !r.i)
+ {
+ delete this;
+ return;
+ }
+
+ Serializable* &obj = o->GetSerializableType()->objects[r.i];
+ if (obj)
+ /* This shouldn't be possible */
+ obj->id = 0;
+
+ o->id = r.i;
+ obj = o;
+
+ /* Now that we have the id, insert this object for real */
+ anope_dynamic_static_cast<DatabaseRedis *>(this->owner)->InsertObject(o);
+
+ delete this;
+}
+
+void Deleter::OnResult(const Reply &r)
+{
+ if (r.type != Reply::MULTI_BULK || !me->redis || r.multi_bulk.empty())
+ {
+ delete this;
+ return;
+ }
+
+ /* Transaction start */
+ me->redis->StartTransaction();
+
+ std::vector<Anope::string> args;
+ args.push_back("DEL");
+ args.push_back("hash:" + this->type + ":" + stringify(this->id));
+
+ /* Delete hash object */
+ me->redis->SendCommand(NULL, args);
+
+ args.clear();
+ args.push_back("SREM");
+ args.push_back("ids:" + this->type);
+ args.push_back(stringify(this->id));
+
+ /* Delete id from ids set */
+ me->redis->SendCommand(NULL, args);
+
+ for (unsigned i = 0; i + 1 < r.multi_bulk.size(); i += 2)
+ {
+ const Reply &key = r.multi_bulk[i],
+ &value = r.multi_bulk[i + 1];
+
+ args.clear();
+ args.push_back("SREM");
+ args.push_back("value:" + this->type + ":" + key.bulk + ":" + value.bulk);
+ args.push_back(stringify(this->id));
+
+ /* Delete value -> object id */
+ me->redis->SendCommand(NULL, args);
+ }
+
+ /* Transaction end */
+ me->redis->CommitTransaction();
+
+ delete this;
+}
+
+void Updater::OnResult(const Reply &r)
+{
+ Serialize::Type *st = Serialize::Type::Find(this->type);
+
+ if (!st)
+ {
+ delete this;
+ return;
+ }
+
+ Serializable *obj = st->objects[this->id];
+ if (!obj)
+ {
+ delete this;
+ return;
+ }
+
+ Data data;
+ obj->Serialize(data);
+
+ /* Transaction start */
+ me->redis->StartTransaction();
+
+ for (unsigned i = 0; i + 1 < r.multi_bulk.size(); i += 2)
+ {
+ const Reply &key = r.multi_bulk[i],
+ &value = r.multi_bulk[i + 1];
+
+ std::vector<Anope::string> args;
+ args.push_back("SREM");
+ args.push_back("value:" + this->type + ":" + key.bulk + ":" + value.bulk);
+ args.push_back(stringify(this->id));
+
+ /* Delete value -> object id */
+ me->redis->SendCommand(NULL, args);
+ }
+
+ /* Add object id to id set for this type */
+ std::vector<Anope::string> args;
+ args.push_back("SADD");
+ args.push_back("ids:" + this->type);
+ args.push_back(stringify(obj->id));
+ me->redis->SendCommand(NULL, args);
+
+ args.clear();
+ args.push_back("HMSET");
+ args.push_back("hash:" + this->type + ":" + stringify(obj->id));
+
+ typedef std::map<Anope::string, std::stringstream *> items;
+ for (items::iterator it = data.data.begin(), it_end = data.data.end(); it != it_end; ++it)
+ {
+ const Anope::string &key = it->first;
+ std::stringstream *value = it->second;
+
+ args.push_back(key);
+ args.push_back(value->str());
+
+ std::vector<Anope::string> args2;
+
+ args2.push_back("SADD");
+ args2.push_back("value:" + this->type + ":" + key + ":" + value->str());
+ args2.push_back(stringify(obj->id));
+
+ /* Add to value -> object id set */
+ me->redis->SendCommand(NULL, args2);
+ }
+
+ ++obj->redis_ignore;
+
+ /* Add object */
+ me->redis->SendCommand(NULL, args);
+
+ /* Transaction end */
+ me->redis->CommitTransaction();
+
+ delete this;
+}
+
+void SubscriptionListener::OnResult(const Reply &r)
+{
+ /*
+ * [May 15 13:59:35.645839 2013] Debug: pmessage
+ * [May 15 13:59:35.645866 2013] Debug: __keyspace@*__:anope:hash:*
+ * [May 15 13:59:35.645880 2013] Debug: __keyspace@0__:anope:hash:type:id
+ * [May 15 13:59:35.645893 2013] Debug: hset
+ */
+ if (r.multi_bulk.size() != 4)
+ return;
+
+ size_t sz = r.multi_bulk[2].bulk.find(':');
+ if (sz == Anope::string::npos)
+ return;
+
+ const Anope::string &key = r.multi_bulk[2].bulk.substr(sz + 1),
+ &op = r.multi_bulk[3].bulk;
+
+ sz = key.rfind(':');
+ if (sz == Anope::string::npos)
+ return;
+
+ const Anope::string &id = key.substr(sz + 1);
+
+ size_t sz2 = key.rfind(':', sz - 1);
+ if (sz2 == Anope::string::npos)
+ return;
+ const Anope::string &type = key.substr(sz2 + 1, sz - sz2 - 1);
+
+ Serialize::Type *s_type = Serialize::Type::Find(type);
+
+ if (s_type == NULL)
+ return;
+
+ uint64_t obj_id;
+ try
+ {
+ obj_id = convertTo<uint64_t>(id);
+ }
+ catch (const ConvertException &)
+ {
+ return;
+ }
+
+ if (op == "hset" || op == "hdel")
+ {
+ Serializable *s = s_type->objects[obj_id];
+
+ if (s && s->redis_ignore)
+ {
+ --s->redis_ignore;
+ Log(LOG_DEBUG) << "redis: notify: got modify for object id " << obj_id << " of type " << type << ", but I am ignoring it";
+ }
+ else
+ {
+ Log(LOG_DEBUG) << "redis: notify: got modify for object id " << obj_id << " of type " << type;
+
+ std::vector<Anope::string> args;
+ args.push_back("HGETALL");
+ args.push_back("hash:" + type + ":" + id);
+
+ me->redis->SendCommand(new ModifiedObject(me, type, obj_id), args);
+ }
+ }
+ else if (op == "del")
+ {
+ Serializable* &s = s_type->objects[obj_id];
+ if (s == NULL)
+ return;
+
+ Log(LOG_DEBUG) << "redis: notify: deleting object id " << obj_id << " of type " << type;
+
+ Data data;
+
+ s->Serialize(data);
+
+ /* Transaction start */
+ me->redis->StartTransaction();
+
+ typedef std::map<Anope::string, std::stringstream *> items;
+ for (items::iterator it = data.data.begin(), it_end = data.data.end(); it != it_end; ++it)
+ {
+ const Anope::string &key = it->first;
+ std::stringstream *value = it->second;
+
+ std::vector<Anope::string> args;
+ args.push_back("SREM");
+ args.push_back("value:" + type + ":" + key + ":" + value->str());
+ args.push_back(id);
+
+ /* Delete value -> object id */
+ me->redis->SendCommand(NULL, args);
+ }
+
+ std::vector<Anope::string> args;
+ args.push_back("SREM");
+ args.push_back("ids:" + type);
+ args.push_back(stringify(s->id));
+
+ /* Delete object from id set */
+ me->redis->SendCommand(NULL, args);
+
+ /* Transaction end */
+ me->redis->CommitTransaction();
+
+ delete s;
+ s = NULL;
+ }
+}
+
+void ModifiedObject::OnResult(const Reply &r)
+{
+ Serialize::Type *st = Serialize::Type::Find(this->type);
+
+ if (!st)
+ {
+ delete this;
+ return;
+ }
+
+ Serializable* &obj = st->objects[this->id];
+
+ /* Transaction start */
+ me->redis->StartTransaction();
+
+ /* Erase old object values */
+ if (obj)
+ {
+ Data data;
+
+ obj->Serialize(data);
+
+ typedef std::map<Anope::string, std::stringstream *> items;
+ for (items::iterator it = data.data.begin(), it_end = data.data.end(); it != it_end; ++it)
+ {
+ const Anope::string &key = it->first;
+ std::stringstream *value = it->second;
+
+ std::vector<Anope::string> args;
+ args.push_back("SREM");
+ args.push_back("value:" + st->GetName() + ":" + key + ":" + value->str());
+ args.push_back(stringify(this->id));
+
+ /* Delete value -> object id */
+ me->redis->SendCommand(NULL, args);
+ }
+ }
+
+ Data data;
+
+ for (unsigned i = 0; i + 1 < r.multi_bulk.size(); i += 2)
+ {
+ const Reply &key = r.multi_bulk[i],
+ &value = r.multi_bulk[i + 1];
+
+ data[key.bulk] << value.bulk;
+ }
+
+ obj = st->Unserialize(obj, data);
+ if (obj)
+ {
+ obj->id = this->id;
+ obj->UpdateCache(data);
+
+ /* Insert new object values */
+ typedef std::map<Anope::string, std::stringstream *> items;
+ for (items::iterator it = data.data.begin(), it_end = data.data.end(); it != it_end; ++it)
+ {
+ const Anope::string &key = it->first;
+ std::stringstream *value = it->second;
+
+ std::vector<Anope::string> args;
+ args.push_back("SADD");
+ args.push_back("value:" + st->GetName() + ":" + key + ":" + value->str());
+ args.push_back(stringify(obj->id));
+
+ /* Add to value -> object id set */
+ me->redis->SendCommand(NULL, args);
+ }
+
+ std::vector<Anope::string> args;
+ args.push_back("SADD");
+ args.push_back("ids:" + st->GetName());
+ args.push_back(stringify(obj->id));
+
+ /* Add to type -> id set */
+ me->redis->SendCommand(NULL, args);
+ }
+
+ /* Transaction end */
+ me->redis->CommitTransaction();
+
+ delete this;
+}
+
+MODULE_INIT(DatabaseRedis)
diff --git a/modules/database/db_sql_live.cpp b/modules/database/db_sql_live.cpp
index 1a53f98d0..992a15471 100644
--- a/modules/database/db_sql_live.cpp
+++ b/modules/database/db_sql_live.cpp
@@ -188,7 +188,7 @@ class DBMySQL : public Module, public Pipe
if (res.Get(i, "timestamp").empty())
{
clear_null = true;
- std::map<unsigned int, Serializable *>::iterator it = obj->objects.find(id);
+ std::map<uint64_t, Serializable *>::iterator it = obj->objects.find(id);
if (it != obj->objects.end())
delete it->second; // This also removes this object from the map
}
@@ -200,7 +200,7 @@ class DBMySQL : public Module, public Pipe
data[it->first] << it->second;
Serializable *s = NULL;
- std::map<unsigned int, Serializable *>::iterator it = obj->objects.find(id);
+ std::map<uint64_t, Serializable *>::iterator it = obj->objects.find(id);
if (it != obj->objects.end())
s = it->second;
diff --git a/modules/m_redis.cpp b/modules/m_redis.cpp
new file mode 100644
index 000000000..76cb98d2e
--- /dev/null
+++ b/modules/m_redis.cpp
@@ -0,0 +1,620 @@
+/*
+ *
+ * (C) 2003-2013 Anope Team
+ * Contact us at team@anope.org
+ *
+ * Please read COPYING and README for further details.
+ *
+ *
+ */
+
+#include "module.h"
+#include "modules/redis.h"
+
+using namespace Redis;
+
+class MyRedisService;
+
+class RedisSocket : public BinarySocket, public ConnectionSocket
+{
+ size_t ParseReply(Reply &r, const char *buf, size_t l);
+ public:
+ MyRedisService *provider;
+ std::deque<Interface *> interfaces;
+ std::map<Anope::string, Interface *> subinterfaces;
+
+ RedisSocket(MyRedisService *pro, bool ipv6) : Socket(-1, ipv6), provider(pro) { }
+
+ ~RedisSocket();
+
+ void OnConnect() anope_override;
+ void OnError(const Anope::string &error) anope_override;
+
+ bool Read(const char *buffer, size_t l) anope_override;
+};
+
+class Transaction : public Interface
+{
+ public:
+ std::deque<Interface *> interfaces;
+
+ Transaction(Module *creator) : Interface(creator) { }
+
+ ~Transaction()
+ {
+ for (unsigned i = 0; i < interfaces.size(); ++i)
+ {
+ Interface *inter = interfaces[i];
+
+ if (!inter)
+ continue;
+
+ inter->OnError("Interface going away");
+ }
+ }
+
+ void OnResult(const Reply &r) anope_override
+ {
+ /* This is a multi bulk reply of the results of the queued commands
+ * in this transaction
+ */
+
+ Log(LOG_DEBUG_2) << "redis: transaction complete with " << r.multi_bulk.size() << " results";
+
+ for (unsigned i = 0; i < r.multi_bulk.size(); ++i)
+ {
+ const Reply &reply = r.multi_bulk[i];
+
+ if (interfaces.empty())
+ break;
+
+ Interface *i = interfaces.front();
+ interfaces.pop_front();
+
+ if (i)
+ i->OnResult(reply);
+ }
+ }
+};
+
+class MyRedisService : public Provider
+{
+ public:
+ Anope::string host;
+ int port;
+ unsigned db;
+
+ RedisSocket *sock, *sub;
+
+ Transaction ti;
+ bool in_transaction;
+
+ MyRedisService(Module *c, const Anope::string &n, const Anope::string &h, int p, unsigned d) : Provider(c, n), host(h), port(p), db(d), sock(NULL), sub(NULL),
+ ti(c), in_transaction(false)
+ {
+ sock = new RedisSocket(this, host.find(':') != Anope::string::npos);
+ sock->Connect(host, port);
+
+ sub = new RedisSocket(this, host.find(':') != Anope::string::npos);
+ sub->Connect(host, port);
+ }
+
+ ~MyRedisService()
+ {
+ if (sock)
+ {
+ sock->flags[SF_DEAD] = true;
+ sock->provider = NULL;
+ }
+
+ if (sub)
+ {
+ sub->flags[SF_DEAD] = true;
+ sub->provider = NULL;
+ }
+ }
+
+ private:
+ inline void Pack(std::vector<char> &buffer, const char *buf, size_t sz = 0)
+ {
+ if (!sz)
+ sz = strlen(buf);
+
+ size_t old_size = buffer.size();
+ buffer.resize(old_size + sz);
+ std::copy(buf, buf + sz, buffer.begin() + old_size);
+ }
+
+ void Send(RedisSocket *s, Interface *i, const std::vector<std::pair<const char *, size_t> > &args)
+ {
+ std::vector<char> buffer;
+
+ Pack(buffer, "*");
+ Pack(buffer, stringify(args.size()).c_str());
+ Pack(buffer, "\r\n");
+
+ for (unsigned j = 0; j < args.size(); ++j)
+ {
+ const std::pair<const char *, size_t> &pair = args[j];
+
+ Pack(buffer, "$");
+ Pack(buffer, stringify(pair.second).c_str());
+ Pack(buffer, "\r\n");
+
+ Pack(buffer, pair.first, pair.second);
+ Pack(buffer, "\r\n");
+ }
+
+ if (buffer.empty())
+ return;
+
+ s->Write(&buffer[0], buffer.size());
+ if (in_transaction)
+ {
+ ti.interfaces.push_back(i);
+ s->interfaces.push_back(NULL); // For the +Queued response
+ }
+ else
+ s->interfaces.push_back(i);
+ }
+
+ public:
+ void SendCommand(RedisSocket *s, Interface *i, const std::vector<Anope::string> &cmds)
+ {
+ std::vector<std::pair<const char *, size_t> > args;
+ for (unsigned j = 0; j < cmds.size(); ++j)
+ args.push_back(std::make_pair(cmds[j].c_str(), cmds[j].length()));
+ this->Send(s, i, args);
+ }
+
+ void SendCommand(RedisSocket *s, Interface *i, const Anope::string &str)
+ {
+ std::vector<Anope::string> args;
+ spacesepstream(str).GetTokens(args);
+ this->SendCommand(s, i, args);
+ }
+
+ void Send(Interface *i, const std::vector<std::pair<const char *, size_t> > &args) anope_override
+ {
+ if (!sock)
+ {
+ sock = new RedisSocket(this, host.find(':') != Anope::string::npos);
+ sock->Connect(host, port);
+ }
+
+ this->Send(sock, i, args);
+ }
+
+ void SendCommand(Interface *i, const std::vector<Anope::string> &cmds) anope_override
+ {
+ std::vector<std::pair<const char *, size_t> > args;
+ for (unsigned j = 0; j < cmds.size(); ++j)
+ args.push_back(std::make_pair(cmds[j].c_str(), cmds[j].length()));
+ this->Send(i, args);
+ }
+
+ void SendCommand(Interface *i, const Anope::string &str) anope_override
+ {
+ std::vector<Anope::string> args;
+ spacesepstream(str).GetTokens(args);
+ this->SendCommand(i, args);
+ }
+
+ bool BlockAndProcess() anope_override
+ {
+ this->sock->ProcessWrite();
+ this->sock->SetBlocking(true);
+ this->sock->ProcessRead();
+ this->sock->SetBlocking(false);
+ return !this->sock->interfaces.empty();
+ }
+
+ void Subscribe(Interface *i, const Anope::string &pattern) anope_override
+ {
+ if (sub == NULL)
+ {
+ sub = new RedisSocket(this, host.find(':') != Anope::string::npos);
+ sub->Connect(host, port);
+ }
+
+ std::vector<Anope::string> args;
+ args.push_back("PSUBSCRIBE");
+ args.push_back(pattern);
+ this->SendCommand(sub, NULL, args);
+
+ sub->subinterfaces[pattern] = i;
+ }
+
+ void Unsubscribe(const Anope::string &pattern) anope_override
+ {
+ if (sub)
+ sub->subinterfaces.erase(pattern);
+ }
+
+ void StartTransaction() anope_override
+ {
+ if (in_transaction)
+ throw CoreException();
+
+ this->SendCommand(NULL, "MULTI");
+ in_transaction = true;
+ }
+
+ void CommitTransaction() anope_override
+ {
+ /* The result of the transaction comes back to the reply of EXEC as a multi bulk.
+ * The reply to the individual commands that make up the transaction when executed
+ * is a simple +QUEUED
+ */
+ in_transaction = false;
+ this->SendCommand(&this->ti, "EXEC");
+ }
+};
+
+RedisSocket::~RedisSocket()
+{
+ if (provider)
+ {
+ if (provider->sock == this)
+ provider->sock = NULL;
+ else if (provider->sub == this)
+ provider->sub = NULL;
+ }
+
+ for (unsigned i = 0; i < interfaces.size(); ++i)
+ {
+ Interface *inter = interfaces[i];
+
+ if (!inter)
+ continue;
+
+ inter->OnError("Interface going away");
+ }
+}
+
+void RedisSocket::OnConnect()
+{
+ Log() << "redis: Successfully connected to " << provider->name << (this == this->provider->sub ? " (sub)" : "");
+
+ this->provider->SendCommand(NULL, "CLIENT SETNAME Anope");
+ this->provider->SendCommand(NULL, "SELECT " + stringify(provider->db));
+
+ if (this != this->provider->sub)
+ {
+ this->provider->SendCommand(this, NULL, "CONFIG SET notify-keyspace-events KA");
+ }
+}
+
+void RedisSocket::OnError(const Anope::string &error)
+{
+ Log() << "redis: Error on " << provider->name << (this == this->provider->sub ? " (sub)" : "") << ": " << error;
+}
+
+size_t RedisSocket::ParseReply(Reply &r, const char *buffer, size_t l)
+{
+ size_t used = 0;
+
+ if (!l)
+ return used;
+
+ if (r.type == Reply::MULTI_BULK)
+ goto multi_bulk_cont;
+
+ switch (*buffer)
+ {
+ case '+':
+ {
+ Anope::string reason(buffer, 1, l - 1);
+ size_t nl = reason.find("\r\n");
+ Log(LOG_DEBUG_2) << "redis: status ok: " << reason.substr(0, nl);
+ if (nl != Anope::string::npos)
+ {
+ r.type = Reply::OK;
+ used = 1 + nl + 2;
+ }
+ break;
+ }
+ case '-':
+ {
+ Anope::string reason(buffer, 1, l - 1);
+ size_t nl = reason.find("\r\n");
+ Log(LOG_DEBUG) << "redis: status error: " << reason.substr(0, nl);
+ if (nl != Anope::string::npos)
+ {
+ r.type = Reply::ERROR;
+ used = 1 + nl + 2;
+ }
+ break;
+ }
+ case ':':
+ {
+ Anope::string ibuf(buffer, 1, l - 1);
+ size_t nl = ibuf.find("\r\n");
+ if (nl != Anope::string::npos)
+ {
+ try
+ {
+ r.i = convertTo<int64_t>(ibuf.substr(0, nl));
+ }
+ catch (const ConvertException &) { }
+
+ r.type = Reply::INT;
+ used = 1 + nl + 2;
+ }
+ break;
+ }
+ case '$':
+ {
+ Anope::string reply(buffer + 1, l - 1);
+ /* This assumes one bulk can always fit in our recv buffer */
+ size_t nl = reply.find("\r\n");
+ if (nl != Anope::string::npos)
+ {
+ int len;
+ try
+ {
+ len = convertTo<int>(reply.substr(0, nl));
+ if (len >= 0)
+ {
+ if (1 + nl + 2 + len + 2 <= l)
+ {
+ used = 1 + nl + 2 + len + 2;
+ r.bulk = reply.substr(nl + 2, len);
+ r.type = Reply::BULK;
+ }
+ }
+ else
+ {
+ used = 1 + nl + 2 + 2;
+ r.type = Reply::BULK;
+ }
+ }
+ catch (const ConvertException &) { }
+ }
+ break;
+ }
+ multi_bulk_cont:
+ case '*':
+ {
+ if (r.type != Reply::MULTI_BULK)
+ {
+ Anope::string reply(buffer + 1, l - 1);
+ size_t nl = reply.find("\r\n");
+ if (nl != Anope::string::npos)
+ {
+ r.type = Reply::MULTI_BULK;
+ try
+ {
+ r.multi_bulk_size = convertTo<int>(reply.substr(0, nl));
+ }
+ catch (const ConvertException &) { }
+
+ used = 1 + nl + 2;
+ }
+ else
+ break;
+ }
+ else if (r.multi_bulk.size() == r.multi_bulk_size)
+ {
+ /* This multi bulk is already complete, so check the sub bulks */
+ for (unsigned i = 0; i < r.multi_bulk.size(); ++i)
+ if (r.multi_bulk[i].type == Reply::MULTI_BULK)
+ ParseReply(r.multi_bulk[i], buffer + used, l - used);
+ break;
+ }
+
+ for (int i = r.multi_bulk.size(); i < r.multi_bulk_size; ++i)
+ {
+ r.multi_bulk.push_back(Reply());
+ size_t u = ParseReply(r.multi_bulk.back(), buffer + used, l - used);
+ if (!u)
+ {
+ Log(LOG_DEBUG) << "redis: ran out of data to parse";
+ r.multi_bulk.pop_back();
+ break;
+ }
+ used += u;
+ }
+ break;
+ }
+ default:
+ Log(LOG_DEBUG) << "redis: unknown reply " << *buffer;
+ }
+
+ return used;
+}
+
+bool RedisSocket::Read(const char *buffer, size_t l)
+{
+ static std::vector<char> save;
+ std::vector<char> copy;
+
+ if (!save.empty())
+ {
+ std::copy(buffer, buffer + l, std::back_inserter(save));
+
+ copy = save;
+
+ buffer = &copy[0];
+ l = copy.size();
+ }
+
+ while (l)
+ {
+ static Reply r;
+
+ size_t used = this->ParseReply(r, buffer, l);
+ if (!used)
+ {
+ Log(LOG_DEBUG) << "redis: used == 0 ?";
+ r.Clear();
+ break;
+ }
+ else if (used > l)
+ {
+ Log(LOG_DEBUG) << "redis: used > l ?";
+ r.Clear();
+ break;
+ }
+
+ /* Full result is not here yet */
+ if (r.type == Reply::MULTI_BULK && static_cast<unsigned>(r.multi_bulk_size) != r.multi_bulk.size())
+ {
+ buffer += used;
+ l -= used;
+ break;
+ }
+
+ if (this == provider->sub)
+ {
+ if (r.multi_bulk.size() == 4)
+ {
+ /* pmessage
+ * pattern subscribed to
+ * __keyevent@0__:set
+ * key
+ */
+ std::map<Anope::string, Interface *>::iterator it = this->subinterfaces.find(r.multi_bulk[1].bulk);
+ if (it != this->subinterfaces.end())
+ it->second->OnResult(r);
+ }
+ }
+ else
+ {
+ if (this->interfaces.empty())
+ {
+ Log(LOG_DEBUG) << "redis: no interfaces?";
+ }
+ else
+ {
+ Interface *i = this->interfaces.front();
+ this->interfaces.pop_front();
+
+ if (i)
+ {
+ if (r.type != Reply::ERROR)
+ i->OnResult(r);
+ else
+ i->OnError(r.bulk);
+ }
+ }
+ }
+
+ buffer += used;
+ l -= used;
+
+ r.Clear();
+ }
+
+ if (l)
+ {
+ save.resize(l);
+ std::copy(buffer, buffer + l, save.begin());
+ }
+ else
+ std::vector<char>().swap(save);
+
+ return true;
+}
+
+
+class ModuleRedis : public Module
+{
+ std::map<Anope::string, MyRedisService *> services;
+
+ public:
+ ModuleRedis(const Anope::string &modname, const Anope::string &creator) : Module(modname, creator, EXTRA | VENDOR)
+ {
+ Implementation i[] = { I_OnReload, I_OnModuleUnload };
+ ModuleManager::Attach(i, this, sizeof(i) / sizeof(Implementation));
+ }
+
+ ~ModuleRedis()
+ {
+ for (std::map<Anope::string, MyRedisService *>::iterator it = services.begin(); it != services.end(); ++it)
+ {
+ MyRedisService *p = it->second;
+
+ delete p->sock;
+ p->sock = NULL;
+ delete p->sub;
+ p->sub = NULL;
+
+ delete p;
+ }
+ }
+
+ void OnReload(Configuration::Conf *conf) anope_override
+ {
+ Configuration::Block *block = conf->GetModule(this);
+ std::vector<Anope::string> new_services;
+
+ for (int i = 0; i < block->CountBlock("redis"); ++i)
+ {
+ Configuration::Block *redis = block->GetBlock("redis", i);
+
+ const Anope::string &name = redis->Get<const Anope::string>("name"),
+ &ip = redis->Get<const Anope::string>("ip");
+ int port = redis->Get<int>("port");
+ unsigned db = redis->Get<unsigned>("db");
+
+ delete services[name];
+ services[name] = new MyRedisService(this, name, ip, port, db);
+ new_services.push_back(name);
+ }
+
+ for (std::map<Anope::string, MyRedisService *>::iterator it = services.begin(); it != services.end();)
+ {
+ Provider *p = it->second;
+ ++it;
+
+ if (std::find(new_services.begin(), new_services.end(), p->name) == new_services.end())
+ delete it->second;
+ }
+ }
+
+ void OnModuleUnload(User *, Module *m) anope_override
+ {
+ for (std::map<Anope::string, MyRedisService *>::iterator it = services.begin(); it != services.end(); ++it)
+ {
+ MyRedisService *p = it->second;
+
+ if (p->sock)
+ for (unsigned i = p->sock->interfaces.size(); i > 0; --i)
+ {
+ Interface *inter = p->sock->interfaces[i - 1];
+
+ if (inter && inter->owner == m)
+ {
+ inter->OnError(m->name + " being unloaded");
+ p->sock->interfaces.erase(p->sock->interfaces.begin() + i - 1);
+ }
+ }
+
+ if (p->sub)
+ for (unsigned i = p->sub->interfaces.size(); i > 0; --i)
+ {
+ Interface *inter = p->sub->interfaces[i - 1];
+
+ if (inter && inter->owner == m)
+ {
+ inter->OnError(m->name + " being unloaded");
+ p->sub->interfaces.erase(p->sub->interfaces.begin() + i - 1);
+ }
+ }
+
+ for (unsigned i = p->ti.interfaces.size(); i > 0; --i)
+ {
+ Interface *inter = p->ti.interfaces[i - 1];
+
+ if (inter && inter->owner == m)
+ {
+ inter->OnError(m->name + " being unloaded");
+ p->ti.interfaces.erase(p->ti.interfaces.begin() + i - 1);
+ }
+ }
+ }
+ }
+};
+
+MODULE_INIT(ModuleRedis)
diff --git a/src/serialize.cpp b/src/serialize.cpp
index 444cf79da..bb95cdc41 100644
--- a/src/serialize.cpp
+++ b/src/serialize.cpp
@@ -44,7 +44,7 @@ void Serialize::CheckTypes()
}
}
-Serializable::Serializable(const Anope::string &serialize_type) : last_commit(0), last_commit_time(0), id(0)
+Serializable::Serializable(const Anope::string &serialize_type) : last_commit(0), last_commit_time(0), id(0), redis_ignore(0)
{
if (SerializableItems == NULL)
SerializableItems = new std::list<Serializable *>();
@@ -58,7 +58,7 @@ Serializable::Serializable(const Anope::string &serialize_type) : last_commit(0)
FOREACH_MOD(I_OnSerializableConstruct, OnSerializableConstruct(this));
}
-Serializable::Serializable(const Serializable &other) : last_commit(0), last_commit_time(0), id(0)
+Serializable::Serializable(const Serializable &other) : last_commit(0), last_commit_time(0), id(0), redis_ignore(0)
{
SerializableItems->push_back(this);
this->s_iter = SerializableItems->end();
diff --git a/src/socket_clients.cpp b/src/socket_clients.cpp
index 3686b2c0a..7893a07a9 100644
--- a/src/socket_clients.cpp
+++ b/src/socket_clients.cpp
@@ -17,10 +17,6 @@
#include <errno.h>
-ConnectionSocket::ConnectionSocket() : Socket()
-{
-}
-
void ConnectionSocket::Connect(const Anope::string &TargetHost, int Port)
{
this->io->Connect(this, TargetHost, Port);