summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--data/example.conf23
-rw-r--r--include/threadengine.h6
-rw-r--r--modules/extra/async_commands.h30
-rw-r--r--modules/extra/db_mysql_live.cpp229
-rw-r--r--modules/extra/m_async_commands.cpp137
-rw-r--r--src/threadengine.cpp6
-rw-r--r--src/threadengines/threadengine_pthread.cpp7
-rw-r--r--src/threadengines/threadengine_win32.cpp7
8 files changed, 275 insertions, 170 deletions
diff --git a/data/example.conf b/data/example.conf
index 61f5069f0..3eb1c0125 100644
--- a/data/example.conf
+++ b/data/example.conf
@@ -278,6 +278,8 @@ options
* - db_mysql
* - db_mysql_live
*
+ * You may have more than one loaded at once!
+ *
* The db_mysql_live module is an extension to db_mysql, and should only be used if
* db_mysql is being used. This module pulls data in real time from SQL as it is
* requested by the core as a result of someone executing commands.
@@ -285,12 +287,11 @@ options
* This effectively allows you to edit your database and have it be immediately
* reflected back in Anope.
*
- * It is highly recommended you only use this module if your databases are located
- * locally as this module will generate many queries per command.
- * db_mysql_live only uses threads for commands and non-blocking queries, so it is safe to
- * use on large networks without worrying about response times.
+ * For information on how to make db_mysql_live use asynchronous queries see
+ * m_async_commands.
*
- * NOTE: You can and probably should use db_plain together with db_mysql/db_mysql_live
+ * At this time db_mysql_live only supports pulling data in real time from the four
+ * main tables: anope_bs_core, anope_cs_info, anope_ns_alias, and anope_ns_core.
*
*/
database = "db_plain"
@@ -2030,6 +2031,18 @@ alias
}
+/*
+ * m_async_commands
+ *
+ * Creates a thread for each command executed by a user. You should
+ * only load this if you are using a module designed to work with this.
+ *
+ * If this is loaded with db_mysql_live then Anope will support
+ * processing multiple commands at once which will help very busy networks
+ * with lag issues caused from the overhead of SQL queries caused by db_mysq_live.
+ */
+#module { name = "m_async_commands" }
+
/* Provides the !kb fantasy command */
alias
{
diff --git a/include/threadengine.h b/include/threadengine.h
index 822774c6e..20e3c81eb 100644
--- a/include/threadengine.h
+++ b/include/threadengine.h
@@ -45,7 +45,7 @@ class CoreExport Thread : public Extensible
{
private:
/* Set to true to tell the thread to finish and we are waiting for it */
- bool Exit;
+ bool exit;
public:
/* Handle for this thread */
@@ -67,6 +67,10 @@ class CoreExport Thread : public Extensible
*/
void SetExitState();
+ /** Exit the thread. Note that the thread still must be joined to free resources!
+ */
+ void Exit();
+
/** Returns the exit state of the thread
* @return true if we want to exit
*/
diff --git a/modules/extra/async_commands.h b/modules/extra/async_commands.h
new file mode 100644
index 000000000..b6b9e7c2b
--- /dev/null
+++ b/modules/extra/async_commands.h
@@ -0,0 +1,30 @@
+
+class CommandMutex : public Thread
+{
+ public:
+ // Mutex used by this command to allow the core to drop and pick up processing of it at will
+ Mutex mutex;
+ // Set to true when this thread is processing data that is not thread safe (eg, the command)
+ bool processing;
+ Command *command;
+ CommandSource source;
+ std::vector<Anope::string> params;
+
+ CommandMutex() : Thread(), processing(true) { }
+
+ ~CommandMutex() { }
+
+ virtual void Run() = 0;
+
+ virtual void Lock() = 0;
+
+ virtual void Unlock() = 0;
+};
+
+class AsynchCommandsService : public Service
+{
+ public:
+ AsynchCommandsService(Module *o, const Anope::string &n) : Service(o, n) { }
+ virtual CommandMutex *CurrentCommand() = 0;
+};
+
diff --git a/modules/extra/db_mysql_live.cpp b/modules/extra/db_mysql_live.cpp
index 55e891623..8f14f3998 100644
--- a/modules/extra/db_mysql_live.cpp
+++ b/modules/extra/db_mysql_live.cpp
@@ -1,67 +1,11 @@
#include "module.h"
+#include "async_commands.h"
#include "sql.h"
-class CommandMutex;
-static std::list<CommandMutex *> commands;
-
-/* Current command being processed by the core */
-static CommandMutex *current_command = NULL;
-/* Mutex held by the core when it is processing. Used by threads to halt the core */
-static Mutex main_mutex;
-
-class CommandMutex : public Thread
-{
- public:
- // Mutex used by this command to allow the core to drop and pick up processing of it at will
- Mutex mutex;
- // Set to true when this thread is processing data that is not thread safe (eg, the command)
- bool processing;
- Command *command;
- CommandSource source;
- std::vector<Anope::string> params;
-
- CommandMutex() : Thread(), processing(true)
- {
- commands.push_back(this);
- current_command = this;
- }
-
- ~CommandMutex()
- {
- std::list<CommandMutex *>::iterator it = std::find(commands.begin(), commands.end(), this);
- if (it != commands.end())
- commands.erase(it);
- if (this == current_command)
- current_command = NULL;
- }
-
- void Run()
- {
- User *u = this->source.u;
- BotInfo *bi = this->source.owner;
-
- if (!command->permission.empty() && !u->Account()->HasCommand(command->permission))
- {
- u->SendMessage(bi, LanguageString::ACCESS_DENIED);
- Log(LOG_COMMAND, "denied", bi) << "Access denied for user " << u->GetMask() << " with command " << command;
- }
- else
- {
- CommandReturn ret = command->Execute(source, params);
-
- if (ret == MOD_CONT)
- {
- FOREACH_MOD(I_OnPostCommand, OnPostCommand(source, command, params));
- }
- }
-
- main_mutex.Unlock();
- }
-};
-
-class MySQLLiveModule : public Module, public Pipe
+class MySQLLiveModule : public Module
{
service_reference<SQLProvider> SQL;
+ service_reference<AsynchCommandsService> ACS;
SQLResult RunQuery(const Anope::string &query)
{
@@ -76,86 +20,41 @@ class MySQLLiveModule : public Module, public Pipe
return SQL ? SQL->Escape(query) : query;
}
- public:
- MySQLLiveModule(const Anope::string &modname, const Anope::string &creator) : Module(modname, creator), SQL("mysql/main")
+ CommandMutex *CurrentCommand()
{
- Implementation i[] = { I_OnPreCommand, I_OnFindBot, I_OnFindChan, I_OnFindNick, I_OnFindCore };
- ModuleManager::Attach(i, this, 5);
+ if (this->ACS)
+ return this->ACS->CurrentCommand();
+ return NULL;
}
- EventReturn OnPreCommand(CommandSource &source, Command *command, const std::vector<Anope::string> &params)
- {
- if (this->SQL)
- {
- CommandMutex *cm = new CommandMutex();
- try
- {
- cm->mutex.Lock();
- cm->command = command;
- cm->source = source;
- cm->params = params;
-
- commands.push_back(cm);
-
- // Give processing to the command thread
- Log(LOG_DEBUG_2) << "db_mysql_live: Waiting for command thread " << cm->command->name << " from " << source.u->nick;
- threadEngine.Start(cm);
- main_mutex.Lock();
-
- return EVENT_STOP;
- }
- catch (const CoreException &ex)
- {
- delete cm;
- Log() << "db_mysql_live: Unable to thread for command: " << ex.GetReason();
- }
- }
-
- return EVENT_CONTINUE;
- }
-
- void OnNotify()
+ public:
+ MySQLLiveModule(const Anope::string &modname, const Anope::string &creator) :
+ Module(modname, creator), SQL("mysql/main"), ACS("asynch_commands")
{
- for (std::list<CommandMutex *>::iterator it = commands.begin(), it_end = commands.end(); it != it_end; ++it)
- {
- CommandMutex *cm = *it;
-
- // Thread engine will pick this up later
- if (cm->GetExitState() || !cm->processing)
- continue;
-
- Log(LOG_DEBUG_2) << "db_mysql_live: Waiting for command thread " << cm->command->name << " from " << cm->source.u->nick;
- current_command = cm;
-
- // Unlock to give processing back to the command thread
- cm->mutex.Unlock();
- // Relock to regain processing once the command thread hangs for any reason
- main_mutex.Lock();
-
- current_command = NULL;
- }
+ Implementation i[] = { I_OnFindBot, I_OnFindChan, I_OnFindNick, I_OnFindCore };
+ ModuleManager::Attach(i, this, 4);
}
-
void OnFindBot(const Anope::string &nick)
{
- if (!current_command)
+ static bool lookup = true;
+ if (lookup == false)
+ {
+ lookup = true;
return;
-
- CommandMutex *cm = current_command;
-
- // Give it back to the core
- cm->processing = false;
- main_mutex.Unlock();
- SQLResult res = this->RunQuery("SELECT * FROM `anope_bs_core` WHERE `nick` = '" + this->Escape(nick) + "'");
- // And take it back...
- cm->processing = true;
- this->Notify();
- cm->mutex.Lock();
+ }
try
{
- current_command = NULL;
+ CommandMutex *current_command = this->CurrentCommand();
+
+ if (current_command)
+ current_command->Unlock();
+ SQLResult res = this->RunQuery("SELECT * FROM `anope_bs_core` WHERE `nick` = '" + this->Escape(nick) + "'");
+ if (current_command)
+ current_command->Lock();
+
+ lookup = false;
BotInfo *bi = findbot(res.Get(0, "nick"));
if (!bi)
bi = new BotInfo(res.Get(0, "nick"), res.Get(0, "user"), res.Get(0, "host"), res.Get(0, "rname"));
@@ -177,21 +76,24 @@ class MySQLLiveModule : public Module, public Pipe
void OnFindChan(const Anope::string &chname)
{
- if (!current_command)
+ static bool lookup = true;
+ if (lookup == false)
+ {
+ lookup = true;
return;
-
- CommandMutex *cm = current_command;
-
- cm->processing = false;
- main_mutex.Unlock();
- SQLResult res = this->RunQuery("SELECT * FROM `anope_cs_info` WHERE `name` = '" + this->Escape(chname) + "'");
- cm->processing = true;
- this->Notify();
- cm->mutex.Lock();
+ }
try
{
- current_command = NULL;
+ CommandMutex *current_command = this->CurrentCommand();
+
+ if (current_command)
+ current_command->Unlock();
+ SQLResult res = this->RunQuery("SELECT * FROM `anope_cs_info` WHERE `name` = '" + this->Escape(chname) + "'");
+ if (current_command)
+ current_command->Lock();
+
+ lookup = false;
ChannelInfo *ci = cs_findchan(res.Get(0, "name"));
if (!ci)
ci = new ChannelInfo(res.Get(0, "name"));
@@ -263,25 +165,27 @@ class MySQLLiveModule : public Module, public Pipe
void OnFindNick(const Anope::string &nick)
{
- if (!current_command)
+ static bool lookup = true;
+ if (lookup == false)
+ {
+ lookup = true;
return;
-
- CommandMutex *cm = current_command;
-
- cm->processing = false;
- main_mutex.Unlock();
- SQLResult res = this->RunQuery("SELECT * FROM `anope_ns_alias` WHERE `nick` = '" + this->Escape(nick) + "'");
- cm->processing = true;
- this->Notify();
- cm->mutex.Lock();
+ }
try
{
- // Make OnFindCore trigger and look up the core too
+ CommandMutex *current_command = this->CurrentCommand();
+
+ if (current_command)
+ current_command->Unlock();
+ SQLResult res = this->RunQuery("SELECT * FROM `anope_ns_alias` WHERE `nick` = '" + this->Escape(nick) + "'");
+ if (current_command)
+ current_command->Lock();
+
NickCore *nc = findcore(res.Get(0, "display"));
if (!nc)
return;
- current_command = NULL;
+ lookup = false;
NickAlias *na = findnick(res.Get(0, "nick"));
if (!na)
na = new NickAlias(res.Get(0, "nick"), nc);
@@ -310,21 +214,24 @@ class MySQLLiveModule : public Module, public Pipe
void OnFindCore(const Anope::string &nick)
{
- if (!current_command)
+ static bool lookup = true;
+ if (lookup == false)
+ {
+ lookup = true;
return;
-
- CommandMutex *cm = current_command;
-
- cm->processing = false;
- main_mutex.Unlock();
- SQLResult res = this->RunQuery("SELECT * FROM `anope_ns_core` WHERE `name` = '" + this->Escape(nick) + "'");
- cm->processing = true;
- this->Notify();
- cm->mutex.Lock();
+ }
try
{
- current_command = NULL;
+ CommandMutex *current_command = this->CurrentCommand();
+
+ if (current_command)
+ current_command->Unlock();
+ SQLResult res = this->RunQuery("SELECT * FROM `anope_ns_core` WHERE `name` = '" + this->Escape(nick) + "'");
+ if (current_command)
+ current_command->Lock();
+
+ lookup = false;
NickCore *nc = findcore(res.Get(0, "display"));
if (!nc)
nc = new NickCore(res.Get(0, "display"));
diff --git a/modules/extra/m_async_commands.cpp b/modules/extra/m_async_commands.cpp
new file mode 100644
index 000000000..ddc660ea8
--- /dev/null
+++ b/modules/extra/m_async_commands.cpp
@@ -0,0 +1,137 @@
+#include "module.h"
+#include "async_commands.h"
+
+static Pipe *me;
+static CommandMutex *current_command = NULL;
+static std::list<CommandMutex *> commands;
+/* Mutex held by the core when it is processing. Used by threads to halt the core */
+static Mutex main_mutex;
+
+class AsynchCommandMutex : public CommandMutex
+{
+ public:
+ AsynchCommandMutex() : CommandMutex()
+ {
+ commands.push_back(this);
+ current_command = this;
+ }
+
+ ~AsynchCommandMutex()
+ {
+ std::list<CommandMutex *>::iterator it = std::find(commands.begin(), commands.end(), this);
+ if (it != commands.end())
+ commands.erase(it);
+ if (this == current_command)
+ current_command = NULL;
+ }
+
+ void Run()
+ {
+ User *u = this->source.u;
+ BotInfo *bi = this->source.owner;
+
+ if (!command->permission.empty() && !u->Account()->HasCommand(command->permission))
+ {
+ u->SendMessage(bi, LanguageString::ACCESS_DENIED);
+ Log(LOG_COMMAND, "denied", bi) << "Access denied for user " << u->GetMask() << " with command " << command;
+ }
+ else
+ {
+ CommandReturn ret = command->Execute(source, params);
+
+ if (ret == MOD_CONT)
+ {
+ FOREACH_MOD(I_OnPostCommand, OnPostCommand(source, command, params));
+ }
+ }
+
+ main_mutex.Unlock();
+ }
+
+ void Lock()
+ {
+ this->processing = true;
+ me->Notify();
+ this->mutex.Lock();
+ }
+
+ void Unlock()
+ {
+ this->processing = false;
+ main_mutex.Unlock();
+ }
+};
+
+
+class ModuleAsynchCommands : public Module, public Pipe, public AsynchCommandsService
+{
+ public:
+ ModuleAsynchCommands(const Anope::string &modname, const Anope::string &creator) : Module(modname, creator), Pipe(), AsynchCommandsService(this, "asynch_commands")
+ {
+ me = this;
+
+ this->SetPermanent(true);
+
+ main_mutex.Lock();
+
+ Implementation i[] = { I_OnPreCommand };
+ ModuleManager::Attach(i, this, 1);
+
+ ModuleManager::RegisterService(this);
+ }
+
+ EventReturn OnPreCommand(CommandSource &source, Command *command, const std::vector<Anope::string> &params)
+ {
+ AsynchCommandMutex *cm = new AsynchCommandMutex();
+ try
+ {
+ cm->mutex.Lock();
+ cm->command = command;
+ cm->source = source;
+ cm->params = params;
+
+ // Give processing to the command thread
+ Log(LOG_DEBUG_2) << "Waiting for command thread " << cm->command->name << " from " << source.u->nick;
+ threadEngine.Start(cm);
+ main_mutex.Lock();
+
+ return EVENT_STOP;
+ }
+ catch (const CoreException &ex)
+ {
+ delete cm;
+ Log() << "Unable to thread for command: " << ex.GetReason();
+ }
+
+ return EVENT_CONTINUE;
+ }
+
+ void OnNotify()
+ {
+ for (std::list<CommandMutex *>::iterator it = commands.begin(), it_end = commands.end(); it != it_end; ++it)
+ {
+ CommandMutex *cm = *it;
+
+ // Thread engine will pick this up later
+ if (cm->GetExitState() || !cm->processing)
+ continue;
+
+ Log(LOG_DEBUG_2) << "Waiting for command thread " << cm->command->name << " from " << cm->source.u->nick;
+ current_command = cm;
+
+ // Unlock to give processing back to the command thread
+ cm->mutex.Unlock();
+ // Relock to regain processing once the command thread hangs for any reason
+ main_mutex.Lock();
+
+ current_command = NULL;
+ }
+ }
+
+ CommandMutex *CurrentCommand()
+ {
+ return current_command;
+ }
+};
+
+MODULE_INIT(ModuleAsynchCommands)
diff --git a/src/threadengine.cpp b/src/threadengine.cpp
index 0ac049c38..c2aa3da20 100644
--- a/src/threadengine.cpp
+++ b/src/threadengine.cpp
@@ -20,7 +20,7 @@ void ThreadEngine::Process()
/** Threads constructor
*/
-Thread::Thread() : Exit(false)
+Thread::Thread() : exit(false)
{
threadEngine.threads.push_back(this);
}
@@ -41,7 +41,7 @@ Thread::~Thread()
*/
void Thread::SetExitState()
{
- Exit = true;
+ exit = true;
}
/** Returns the exit state of the thread
@@ -49,7 +49,7 @@ void Thread::SetExitState()
*/
bool Thread::GetExitState() const
{
- return Exit;
+ return exit;
}
/** Called to run the thread, should be overloaded
diff --git a/src/threadengines/threadengine_pthread.cpp b/src/threadengines/threadengine_pthread.cpp
index acdccfeb5..e8242bb82 100644
--- a/src/threadengines/threadengine_pthread.cpp
+++ b/src/threadengines/threadengine_pthread.cpp
@@ -39,6 +39,13 @@ void Thread::Join()
pthread_join(Handle, NULL);
}
+/** Exit the thread. Note that the thread still must be joined to free resources!
+ */
+void Thread::Exit()
+{
+ pthread_exit(0);
+}
+
/** Start a new thread
* @param thread A pointer to a newley allocated thread
*/
diff --git a/src/threadengines/threadengine_win32.cpp b/src/threadengines/threadengine_win32.cpp
index e6ad725bf..218f711c4 100644
--- a/src/threadengines/threadengine_win32.cpp
+++ b/src/threadengines/threadengine_win32.cpp
@@ -31,6 +31,13 @@ void Thread::Join()
WaitForSingleObject(Handle, INFINITE);
}
+/** Exit the thread. Note that the thread still must be joined to free resources!
+ */
+void Thread::Exit()
+{
+ ExitThread(0);
+}
+
/** Start a new thread
* @param thread A pointer to a newley allocated thread
*/