diff options
author | Adam <Adam@anope.org> | 2011-02-17 14:31:21 -0500 |
---|---|---|
committer | Adam <Adam@anope.org> | 2011-02-17 14:31:21 -0500 |
commit | 536ea2189ae1ff8bf05583ed03fac86797616ac9 (patch) | |
tree | 30b5056f53dea0d01be10f24360a443833e6565a | |
parent | 18bd33f9a477f7ce00d6f3c00acc0b0b68f5028d (diff) |
Split db_mysql_live into two modules so other modules can make use of the asynchronous command interface
-rw-r--r-- | data/example.conf | 23 | ||||
-rw-r--r-- | include/threadengine.h | 6 | ||||
-rw-r--r-- | modules/extra/async_commands.h | 30 | ||||
-rw-r--r-- | modules/extra/db_mysql_live.cpp | 229 | ||||
-rw-r--r-- | modules/extra/m_async_commands.cpp | 137 | ||||
-rw-r--r-- | src/threadengine.cpp | 6 | ||||
-rw-r--r-- | src/threadengines/threadengine_pthread.cpp | 7 | ||||
-rw-r--r-- | src/threadengines/threadengine_win32.cpp | 7 |
8 files changed, 275 insertions, 170 deletions
diff --git a/data/example.conf b/data/example.conf index 61f5069f0..3eb1c0125 100644 --- a/data/example.conf +++ b/data/example.conf @@ -278,6 +278,8 @@ options * - db_mysql * - db_mysql_live * + * You may have more than one loaded at once! + * * The db_mysql_live module is an extension to db_mysql, and should only be used if * db_mysql is being used. This module pulls data in real time from SQL as it is * requested by the core as a result of someone executing commands. @@ -285,12 +287,11 @@ options * This effectively allows you to edit your database and have it be immediately * reflected back in Anope. * - * It is highly recommended you only use this module if your databases are located - * locally as this module will generate many queries per command. - * db_mysql_live only uses threads for commands and non-blocking queries, so it is safe to - * use on large networks without worrying about response times. + * For information on how to make db_mysql_live use asynchronous queries see + * m_async_commands. * - * NOTE: You can and probably should use db_plain together with db_mysql/db_mysql_live + * At this time db_mysql_live only supports pulling data in real time from the four + * main tables: anope_bs_core, anope_cs_info, anope_ns_alias, and anope_ns_core. * */ database = "db_plain" @@ -2030,6 +2031,18 @@ alias } +/* + * m_async_commands + * + * Creates a thread for each command executed by a user. You should + * only load this if you are using a module designed to work with this. + * + * If this is loaded with db_mysql_live then Anope will support + * processing multiple commands at once which will help very busy networks + * with lag issues caused from the overhead of SQL queries caused by db_mysq_live. + */ +#module { name = "m_async_commands" } + /* Provides the !kb fantasy command */ alias { diff --git a/include/threadengine.h b/include/threadengine.h index 822774c6e..20e3c81eb 100644 --- a/include/threadengine.h +++ b/include/threadengine.h @@ -45,7 +45,7 @@ class CoreExport Thread : public Extensible { private: /* Set to true to tell the thread to finish and we are waiting for it */ - bool Exit; + bool exit; public: /* Handle for this thread */ @@ -67,6 +67,10 @@ class CoreExport Thread : public Extensible */ void SetExitState(); + /** Exit the thread. Note that the thread still must be joined to free resources! + */ + void Exit(); + /** Returns the exit state of the thread * @return true if we want to exit */ diff --git a/modules/extra/async_commands.h b/modules/extra/async_commands.h new file mode 100644 index 000000000..b6b9e7c2b --- /dev/null +++ b/modules/extra/async_commands.h @@ -0,0 +1,30 @@ + +class CommandMutex : public Thread +{ + public: + // Mutex used by this command to allow the core to drop and pick up processing of it at will + Mutex mutex; + // Set to true when this thread is processing data that is not thread safe (eg, the command) + bool processing; + Command *command; + CommandSource source; + std::vector<Anope::string> params; + + CommandMutex() : Thread(), processing(true) { } + + ~CommandMutex() { } + + virtual void Run() = 0; + + virtual void Lock() = 0; + + virtual void Unlock() = 0; +}; + +class AsynchCommandsService : public Service +{ + public: + AsynchCommandsService(Module *o, const Anope::string &n) : Service(o, n) { } + virtual CommandMutex *CurrentCommand() = 0; +}; + diff --git a/modules/extra/db_mysql_live.cpp b/modules/extra/db_mysql_live.cpp index 55e891623..8f14f3998 100644 --- a/modules/extra/db_mysql_live.cpp +++ b/modules/extra/db_mysql_live.cpp @@ -1,67 +1,11 @@ #include "module.h" +#include "async_commands.h" #include "sql.h" -class CommandMutex; -static std::list<CommandMutex *> commands; - -/* Current command being processed by the core */ -static CommandMutex *current_command = NULL; -/* Mutex held by the core when it is processing. Used by threads to halt the core */ -static Mutex main_mutex; - -class CommandMutex : public Thread -{ - public: - // Mutex used by this command to allow the core to drop and pick up processing of it at will - Mutex mutex; - // Set to true when this thread is processing data that is not thread safe (eg, the command) - bool processing; - Command *command; - CommandSource source; - std::vector<Anope::string> params; - - CommandMutex() : Thread(), processing(true) - { - commands.push_back(this); - current_command = this; - } - - ~CommandMutex() - { - std::list<CommandMutex *>::iterator it = std::find(commands.begin(), commands.end(), this); - if (it != commands.end()) - commands.erase(it); - if (this == current_command) - current_command = NULL; - } - - void Run() - { - User *u = this->source.u; - BotInfo *bi = this->source.owner; - - if (!command->permission.empty() && !u->Account()->HasCommand(command->permission)) - { - u->SendMessage(bi, LanguageString::ACCESS_DENIED); - Log(LOG_COMMAND, "denied", bi) << "Access denied for user " << u->GetMask() << " with command " << command; - } - else - { - CommandReturn ret = command->Execute(source, params); - - if (ret == MOD_CONT) - { - FOREACH_MOD(I_OnPostCommand, OnPostCommand(source, command, params)); - } - } - - main_mutex.Unlock(); - } -}; - -class MySQLLiveModule : public Module, public Pipe +class MySQLLiveModule : public Module { service_reference<SQLProvider> SQL; + service_reference<AsynchCommandsService> ACS; SQLResult RunQuery(const Anope::string &query) { @@ -76,86 +20,41 @@ class MySQLLiveModule : public Module, public Pipe return SQL ? SQL->Escape(query) : query; } - public: - MySQLLiveModule(const Anope::string &modname, const Anope::string &creator) : Module(modname, creator), SQL("mysql/main") + CommandMutex *CurrentCommand() { - Implementation i[] = { I_OnPreCommand, I_OnFindBot, I_OnFindChan, I_OnFindNick, I_OnFindCore }; - ModuleManager::Attach(i, this, 5); + if (this->ACS) + return this->ACS->CurrentCommand(); + return NULL; } - EventReturn OnPreCommand(CommandSource &source, Command *command, const std::vector<Anope::string> ¶ms) - { - if (this->SQL) - { - CommandMutex *cm = new CommandMutex(); - try - { - cm->mutex.Lock(); - cm->command = command; - cm->source = source; - cm->params = params; - - commands.push_back(cm); - - // Give processing to the command thread - Log(LOG_DEBUG_2) << "db_mysql_live: Waiting for command thread " << cm->command->name << " from " << source.u->nick; - threadEngine.Start(cm); - main_mutex.Lock(); - - return EVENT_STOP; - } - catch (const CoreException &ex) - { - delete cm; - Log() << "db_mysql_live: Unable to thread for command: " << ex.GetReason(); - } - } - - return EVENT_CONTINUE; - } - - void OnNotify() + public: + MySQLLiveModule(const Anope::string &modname, const Anope::string &creator) : + Module(modname, creator), SQL("mysql/main"), ACS("asynch_commands") { - for (std::list<CommandMutex *>::iterator it = commands.begin(), it_end = commands.end(); it != it_end; ++it) - { - CommandMutex *cm = *it; - - // Thread engine will pick this up later - if (cm->GetExitState() || !cm->processing) - continue; - - Log(LOG_DEBUG_2) << "db_mysql_live: Waiting for command thread " << cm->command->name << " from " << cm->source.u->nick; - current_command = cm; - - // Unlock to give processing back to the command thread - cm->mutex.Unlock(); - // Relock to regain processing once the command thread hangs for any reason - main_mutex.Lock(); - - current_command = NULL; - } + Implementation i[] = { I_OnFindBot, I_OnFindChan, I_OnFindNick, I_OnFindCore }; + ModuleManager::Attach(i, this, 4); } - void OnFindBot(const Anope::string &nick) { - if (!current_command) + static bool lookup = true; + if (lookup == false) + { + lookup = true; return; - - CommandMutex *cm = current_command; - - // Give it back to the core - cm->processing = false; - main_mutex.Unlock(); - SQLResult res = this->RunQuery("SELECT * FROM `anope_bs_core` WHERE `nick` = '" + this->Escape(nick) + "'"); - // And take it back... - cm->processing = true; - this->Notify(); - cm->mutex.Lock(); + } try { - current_command = NULL; + CommandMutex *current_command = this->CurrentCommand(); + + if (current_command) + current_command->Unlock(); + SQLResult res = this->RunQuery("SELECT * FROM `anope_bs_core` WHERE `nick` = '" + this->Escape(nick) + "'"); + if (current_command) + current_command->Lock(); + + lookup = false; BotInfo *bi = findbot(res.Get(0, "nick")); if (!bi) bi = new BotInfo(res.Get(0, "nick"), res.Get(0, "user"), res.Get(0, "host"), res.Get(0, "rname")); @@ -177,21 +76,24 @@ class MySQLLiveModule : public Module, public Pipe void OnFindChan(const Anope::string &chname) { - if (!current_command) + static bool lookup = true; + if (lookup == false) + { + lookup = true; return; - - CommandMutex *cm = current_command; - - cm->processing = false; - main_mutex.Unlock(); - SQLResult res = this->RunQuery("SELECT * FROM `anope_cs_info` WHERE `name` = '" + this->Escape(chname) + "'"); - cm->processing = true; - this->Notify(); - cm->mutex.Lock(); + } try { - current_command = NULL; + CommandMutex *current_command = this->CurrentCommand(); + + if (current_command) + current_command->Unlock(); + SQLResult res = this->RunQuery("SELECT * FROM `anope_cs_info` WHERE `name` = '" + this->Escape(chname) + "'"); + if (current_command) + current_command->Lock(); + + lookup = false; ChannelInfo *ci = cs_findchan(res.Get(0, "name")); if (!ci) ci = new ChannelInfo(res.Get(0, "name")); @@ -263,25 +165,27 @@ class MySQLLiveModule : public Module, public Pipe void OnFindNick(const Anope::string &nick) { - if (!current_command) + static bool lookup = true; + if (lookup == false) + { + lookup = true; return; - - CommandMutex *cm = current_command; - - cm->processing = false; - main_mutex.Unlock(); - SQLResult res = this->RunQuery("SELECT * FROM `anope_ns_alias` WHERE `nick` = '" + this->Escape(nick) + "'"); - cm->processing = true; - this->Notify(); - cm->mutex.Lock(); + } try { - // Make OnFindCore trigger and look up the core too + CommandMutex *current_command = this->CurrentCommand(); + + if (current_command) + current_command->Unlock(); + SQLResult res = this->RunQuery("SELECT * FROM `anope_ns_alias` WHERE `nick` = '" + this->Escape(nick) + "'"); + if (current_command) + current_command->Lock(); + NickCore *nc = findcore(res.Get(0, "display")); if (!nc) return; - current_command = NULL; + lookup = false; NickAlias *na = findnick(res.Get(0, "nick")); if (!na) na = new NickAlias(res.Get(0, "nick"), nc); @@ -310,21 +214,24 @@ class MySQLLiveModule : public Module, public Pipe void OnFindCore(const Anope::string &nick) { - if (!current_command) + static bool lookup = true; + if (lookup == false) + { + lookup = true; return; - - CommandMutex *cm = current_command; - - cm->processing = false; - main_mutex.Unlock(); - SQLResult res = this->RunQuery("SELECT * FROM `anope_ns_core` WHERE `name` = '" + this->Escape(nick) + "'"); - cm->processing = true; - this->Notify(); - cm->mutex.Lock(); + } try { - current_command = NULL; + CommandMutex *current_command = this->CurrentCommand(); + + if (current_command) + current_command->Unlock(); + SQLResult res = this->RunQuery("SELECT * FROM `anope_ns_core` WHERE `name` = '" + this->Escape(nick) + "'"); + if (current_command) + current_command->Lock(); + + lookup = false; NickCore *nc = findcore(res.Get(0, "display")); if (!nc) nc = new NickCore(res.Get(0, "display")); diff --git a/modules/extra/m_async_commands.cpp b/modules/extra/m_async_commands.cpp new file mode 100644 index 000000000..ddc660ea8 --- /dev/null +++ b/modules/extra/m_async_commands.cpp @@ -0,0 +1,137 @@ +#include "module.h" +#include "async_commands.h" + +static Pipe *me; +static CommandMutex *current_command = NULL; +static std::list<CommandMutex *> commands; +/* Mutex held by the core when it is processing. Used by threads to halt the core */ +static Mutex main_mutex; + +class AsynchCommandMutex : public CommandMutex +{ + public: + AsynchCommandMutex() : CommandMutex() + { + commands.push_back(this); + current_command = this; + } + + ~AsynchCommandMutex() + { + std::list<CommandMutex *>::iterator it = std::find(commands.begin(), commands.end(), this); + if (it != commands.end()) + commands.erase(it); + if (this == current_command) + current_command = NULL; + } + + void Run() + { + User *u = this->source.u; + BotInfo *bi = this->source.owner; + + if (!command->permission.empty() && !u->Account()->HasCommand(command->permission)) + { + u->SendMessage(bi, LanguageString::ACCESS_DENIED); + Log(LOG_COMMAND, "denied", bi) << "Access denied for user " << u->GetMask() << " with command " << command; + } + else + { + CommandReturn ret = command->Execute(source, params); + + if (ret == MOD_CONT) + { + FOREACH_MOD(I_OnPostCommand, OnPostCommand(source, command, params)); + } + } + + main_mutex.Unlock(); + } + + void Lock() + { + this->processing = true; + me->Notify(); + this->mutex.Lock(); + } + + void Unlock() + { + this->processing = false; + main_mutex.Unlock(); + } +}; + + +class ModuleAsynchCommands : public Module, public Pipe, public AsynchCommandsService +{ + public: + ModuleAsynchCommands(const Anope::string &modname, const Anope::string &creator) : Module(modname, creator), Pipe(), AsynchCommandsService(this, "asynch_commands") + { + me = this; + + this->SetPermanent(true); + + main_mutex.Lock(); + + Implementation i[] = { I_OnPreCommand }; + ModuleManager::Attach(i, this, 1); + + ModuleManager::RegisterService(this); + } + + EventReturn OnPreCommand(CommandSource &source, Command *command, const std::vector<Anope::string> ¶ms) + { + AsynchCommandMutex *cm = new AsynchCommandMutex(); + try + { + cm->mutex.Lock(); + cm->command = command; + cm->source = source; + cm->params = params; + + // Give processing to the command thread + Log(LOG_DEBUG_2) << "Waiting for command thread " << cm->command->name << " from " << source.u->nick; + threadEngine.Start(cm); + main_mutex.Lock(); + + return EVENT_STOP; + } + catch (const CoreException &ex) + { + delete cm; + Log() << "Unable to thread for command: " << ex.GetReason(); + } + + return EVENT_CONTINUE; + } + + void OnNotify() + { + for (std::list<CommandMutex *>::iterator it = commands.begin(), it_end = commands.end(); it != it_end; ++it) + { + CommandMutex *cm = *it; + + // Thread engine will pick this up later + if (cm->GetExitState() || !cm->processing) + continue; + + Log(LOG_DEBUG_2) << "Waiting for command thread " << cm->command->name << " from " << cm->source.u->nick; + current_command = cm; + + // Unlock to give processing back to the command thread + cm->mutex.Unlock(); + // Relock to regain processing once the command thread hangs for any reason + main_mutex.Lock(); + + current_command = NULL; + } + } + + CommandMutex *CurrentCommand() + { + return current_command; + } +}; + +MODULE_INIT(ModuleAsynchCommands) diff --git a/src/threadengine.cpp b/src/threadengine.cpp index 0ac049c38..c2aa3da20 100644 --- a/src/threadengine.cpp +++ b/src/threadengine.cpp @@ -20,7 +20,7 @@ void ThreadEngine::Process() /** Threads constructor */ -Thread::Thread() : Exit(false) +Thread::Thread() : exit(false) { threadEngine.threads.push_back(this); } @@ -41,7 +41,7 @@ Thread::~Thread() */ void Thread::SetExitState() { - Exit = true; + exit = true; } /** Returns the exit state of the thread @@ -49,7 +49,7 @@ void Thread::SetExitState() */ bool Thread::GetExitState() const { - return Exit; + return exit; } /** Called to run the thread, should be overloaded diff --git a/src/threadengines/threadengine_pthread.cpp b/src/threadengines/threadengine_pthread.cpp index acdccfeb5..e8242bb82 100644 --- a/src/threadengines/threadengine_pthread.cpp +++ b/src/threadengines/threadengine_pthread.cpp @@ -39,6 +39,13 @@ void Thread::Join() pthread_join(Handle, NULL); } +/** Exit the thread. Note that the thread still must be joined to free resources! + */ +void Thread::Exit() +{ + pthread_exit(0); +} + /** Start a new thread * @param thread A pointer to a newley allocated thread */ diff --git a/src/threadengines/threadengine_win32.cpp b/src/threadengines/threadengine_win32.cpp index e6ad725bf..218f711c4 100644 --- a/src/threadengines/threadengine_win32.cpp +++ b/src/threadengines/threadengine_win32.cpp @@ -31,6 +31,13 @@ void Thread::Join() WaitForSingleObject(Handle, INFINITE); } +/** Exit the thread. Note that the thread still must be joined to free resources! + */ +void Thread::Exit() +{ + ExitThread(0); +} + /** Start a new thread * @param thread A pointer to a newley allocated thread */ |