summaryrefslogtreecommitdiff
path: root/modules
diff options
context:
space:
mode:
Diffstat (limited to 'modules')
-rw-r--r--modules/extra/async_commands.h4
-rw-r--r--modules/extra/db_mysql_live.cpp355
-rw-r--r--modules/extra/m_async_commands.cpp95
-rw-r--r--modules/extra/m_mysql.cpp17
4 files changed, 358 insertions, 113 deletions
diff --git a/modules/extra/async_commands.h b/modules/extra/async_commands.h
index b6b9e7c2b..a2f650bae 100644
--- a/modules/extra/async_commands.h
+++ b/modules/extra/async_commands.h
@@ -6,11 +6,11 @@ class CommandMutex : public Thread
Mutex mutex;
// Set to true when this thread is processing data that is not thread safe (eg, the command)
bool processing;
- Command *command;
CommandSource source;
+ Command *command;
std::vector<Anope::string> params;
- CommandMutex() : Thread(), processing(true) { }
+ CommandMutex(CommandSource &s, Command *c, const std::vector<Anope::string> &p) : Thread(), processing(true), source(s), command(c), params(p) { }
~CommandMutex() { }
diff --git a/modules/extra/db_mysql_live.cpp b/modules/extra/db_mysql_live.cpp
index 8f14f3998..c5e645be6 100644
--- a/modules/extra/db_mysql_live.cpp
+++ b/modules/extra/db_mysql_live.cpp
@@ -2,59 +2,51 @@
#include "async_commands.h"
#include "sql.h"
-class MySQLLiveModule : public Module
+class SQLCache : public Timer
{
- service_reference<SQLProvider> SQL;
- service_reference<AsynchCommandsService> ACS;
+ typedef std::map<Anope::string, time_t, std::less<ci::string> > cache_map;
+ cache_map cache;
+ public:
- SQLResult RunQuery(const Anope::string &query)
+ SQLCache() : Timer(300, Anope::CurTime, true) { }
+
+ bool Check(const Anope::string &item)
{
- if (!this->SQL)
- throw SQLException("Unable to locate SQL reference, is m_mysql loaded and configured correctly?");
+ cache_map::iterator it = this->cache.find(item);
+ if (it != this->cache.end() && Anope::CurTime - it->second < 5)
+ return true;
- return SQL->RunQuery(query);
+ this->cache[item] = Anope::CurTime;
+ return false;
}
- const Anope::string Escape(const Anope::string &query)
+ void Tick(time_t)
{
- return SQL ? SQL->Escape(query) : query;
- }
+ for (cache_map::iterator it = this->cache.begin(), next_it; it != this->cache.end(); it = next_it)
+ {
+ next_it = it;
+ ++next_it;
- CommandMutex *CurrentCommand()
- {
- if (this->ACS)
- return this->ACS->CurrentCommand();
- return NULL;
+ if (Anope::CurTime - it->second > 5)
+ this->cache.erase(it);
+ }
}
+};
+class BotInfoUpdater : public SQLInterface, public SQLCache
+{
public:
- MySQLLiveModule(const Anope::string &modname, const Anope::string &creator) :
- Module(modname, creator), SQL("mysql/main"), ACS("asynch_commands")
+ BotInfoUpdater(Module *m) : SQLInterface(m) { }
+
+ void OnResult(const SQLResult &r)
{
- Implementation i[] = { I_OnFindBot, I_OnFindChan, I_OnFindNick, I_OnFindCore };
- ModuleManager::Attach(i, this, 4);
+ BotInfoUpdater::Process(r);
}
- void OnFindBot(const Anope::string &nick)
+ static void Process(const SQLResult &res)
{
- static bool lookup = true;
- if (lookup == false)
- {
- lookup = true;
- return;
- }
-
try
{
- CommandMutex *current_command = this->CurrentCommand();
-
- if (current_command)
- current_command->Unlock();
- SQLResult res = this->RunQuery("SELECT * FROM `anope_bs_core` WHERE `nick` = '" + this->Escape(nick) + "'");
- if (current_command)
- current_command->Lock();
-
- lookup = false;
BotInfo *bi = findbot(res.Get(0, "nick"));
if (!bi)
bi = new BotInfo(res.Get(0, "nick"), res.Get(0, "user"), res.Get(0, "host"), res.Get(0, "rname"));
@@ -70,30 +62,28 @@ class MySQLLiveModule : public Module
bi->created = convertTo<time_t>(res.Get(0, "created"));
bi->chancount = convertTo<uint32>(res.Get(0, "chancount"));
}
- catch (const SQLException &) { }
+ catch (const SQLException &ex)
+ {
+ Log(LOG_DEBUG) << ex.GetReason();
+ }
catch (const ConvertException &) { }
}
+};
- void OnFindChan(const Anope::string &chname)
+class ChanInfoUpdater : public SQLInterface, public SQLCache
+{
+ public:
+ ChanInfoUpdater(Module *m) : SQLInterface(m) { }
+
+ void OnResult(const SQLResult &r)
{
- static bool lookup = true;
- if (lookup == false)
- {
- lookup = true;
- return;
- }
+ ChanInfoUpdater::Process(r);
+ }
+ static void Process(const SQLResult &res)
+ {
try
{
- CommandMutex *current_command = this->CurrentCommand();
-
- if (current_command)
- current_command->Unlock();
- SQLResult res = this->RunQuery("SELECT * FROM `anope_cs_info` WHERE `name` = '" + this->Escape(chname) + "'");
- if (current_command)
- current_command->Lock();
-
- lookup = false;
ChannelInfo *ci = cs_findchan(res.Get(0, "name"));
if (!ci)
ci = new ChannelInfo(res.Get(0, "name"));
@@ -159,33 +149,31 @@ class MySQLLiveModule : public Module
if (ci->c)
check_modes(ci->c);
}
- catch (const SQLException &) { }
+ catch (const SQLException &ex)
+ {
+ Log(LOG_DEBUG) << ex.GetReason();
+ }
catch (const ConvertException &) { }
}
+};
- void OnFindNick(const Anope::string &nick)
+class NickInfoUpdater : public SQLInterface, public SQLCache
+{
+ public:
+ NickInfoUpdater(Module *m) : SQLInterface(m) { }
+
+ void OnResult(const SQLResult &r)
{
- static bool lookup = true;
- if (lookup == false)
- {
- lookup = true;
- return;
- }
+ NickInfoUpdater::Process(r);
+ }
+ static void Process(const SQLResult &res)
+ {
try
{
- CommandMutex *current_command = this->CurrentCommand();
-
- if (current_command)
- current_command->Unlock();
- SQLResult res = this->RunQuery("SELECT * FROM `anope_ns_alias` WHERE `nick` = '" + this->Escape(nick) + "'");
- if (current_command)
- current_command->Lock();
-
NickCore *nc = findcore(res.Get(0, "display"));
if (!nc)
return;
- lookup = false;
NickAlias *na = findnick(res.Get(0, "nick"));
if (!na)
na = new NickAlias(res.Get(0, "nick"), nc);
@@ -208,30 +196,28 @@ class MySQLLiveModule : public Module
na->nc->aliases.push_back(na);
}
}
- catch (const SQLException &) { }
+ catch (const SQLException &ex)
+ {
+ Log(LOG_DEBUG) << ex.GetReason();
+ }
catch (const ConvertException &) { }
}
+};
- void OnFindCore(const Anope::string &nick)
+class NickCoreUpdater : public SQLInterface, public SQLCache
+{
+ public:
+ NickCoreUpdater(Module *m) : SQLInterface(m) { }
+
+ void OnResult(const SQLResult &r)
{
- static bool lookup = true;
- if (lookup == false)
- {
- lookup = true;
- return;
- }
+ NickCoreUpdater::Process(r);
+ }
+ static void Process(const SQLResult &res)
+ {
try
{
- CommandMutex *current_command = this->CurrentCommand();
-
- if (current_command)
- current_command->Unlock();
- SQLResult res = this->RunQuery("SELECT * FROM `anope_ns_core` WHERE `name` = '" + this->Escape(nick) + "'");
- if (current_command)
- current_command->Lock();
-
- lookup = false;
NickCore *nc = findcore(res.Get(0, "display"));
if (!nc)
nc = new NickCore(res.Get(0, "display"));
@@ -243,9 +229,202 @@ class MySQLLiveModule : public Module
nc->FromString(BuildStringVector(res.Get(0, "flags")));
nc->language = res.Get(0, "language");
}
- catch (const SQLException &) { }
+ catch (const SQLException &ex)
+ {
+ Log(LOG_DEBUG) << ex.GetReason();
+ }
catch (const ConvertException &) { }
}
};
+class MySQLLiveModule : public Module
+{
+ service_reference<SQLProvider> SQL;
+ service_reference<AsynchCommandsService> ACS;
+
+ BotInfoUpdater botinfoupdater;
+ ChanInfoUpdater chaninfoupdater;
+ NickInfoUpdater nickinfoupdater;
+ NickCoreUpdater nickcoreupdater;
+
+ SQLResult RunQuery(const Anope::string &query)
+ {
+ if (!this->SQL)
+ throw SQLException("Unable to locate SQL reference, is m_mysql loaded and configured correctly?");
+
+ SQLResult res = SQL->RunQuery(query);
+ if (!res.GetError().empty())
+ throw SQLException(res.GetError());
+ return res;
+ }
+
+ void RunQuery(SQLInterface *i, const Anope::string &query)
+ {
+ if (!this->SQL)
+ throw SQLException("Unable to locate SQL reference, is m_mysql loaded and configured correctly?");
+
+ return SQL->Run(i, query);
+ }
+
+ const Anope::string Escape(const Anope::string &query)
+ {
+ return SQL ? SQL->Escape(query) : query;
+ }
+
+ CommandMutex *CurrentCommand()
+ {
+ if (this->ACS)
+ return this->ACS->CurrentCommand();
+ return NULL;
+ }
+
+ public:
+ MySQLLiveModule(const Anope::string &modname, const Anope::string &creator) :
+ Module(modname, creator), SQL("mysql/main"), ACS("asynch_commands"), botinfoupdater(this),
+ chaninfoupdater(this), nickinfoupdater(this), nickcoreupdater(this)
+ {
+ Implementation i[] = { I_OnFindBot, I_OnFindChan, I_OnFindNick, I_OnFindCore, I_OnPreShutdown };
+ ModuleManager::Attach(i, this, 5);
+ }
+
+ void OnPreShutdown()
+ {
+ Implementation i[] = { I_OnFindBot, I_OnFindChan, I_OnFindNick, I_OnFindCore };
+ for (size_t j = 0; j < 4; ++j)
+ ModuleManager::Detach(i[j], this);
+ }
+
+ void OnFindBot(const Anope::string &nick)
+ {
+ if (botinfoupdater.Check(nick))
+ return;
+
+ try
+ {
+ Anope::string query = "SELECT * FROM `anope_bs_core` WHERE `nick` = '" + this->Escape(nick) + "'";
+ CommandMutex *current_command = this->CurrentCommand();
+ if (current_command)
+ {
+ current_command->Unlock();
+ try
+ {
+ SQLResult res = this->RunQuery(query);
+ current_command->Lock();
+ BotInfoUpdater::Process(res);
+ }
+ catch (const SQLException &ex)
+ {
+ current_command->Lock();
+ throw;
+ }
+ }
+ else
+ this->RunQuery(&botinfoupdater, query);
+ }
+ catch (const SQLException &ex)
+ {
+ Log(LOG_DEBUG) << "OnBotChan: " << ex.GetReason();
+ }
+ }
+
+ void OnFindChan(const Anope::string &chname)
+ {
+ if (chaninfoupdater.Check(chname))
+ return;
+
+ try
+ {
+ Anope::string query = "SELECT * FROM `anope_cs_info` WHERE `name` = '" + this->Escape(chname) + "'";
+ CommandMutex *current_command = this->CurrentCommand();
+ if (current_command)
+ {
+ current_command->Unlock();
+ try
+ {
+ SQLResult res = this->RunQuery(query);
+ current_command->Lock();
+ ChanInfoUpdater::Process(res);
+ }
+ catch (const SQLException &)
+ {
+ current_command->Lock();
+ throw;
+ }
+ }
+ else
+ this->RunQuery(&chaninfoupdater, query);
+ }
+ catch (const SQLException &ex)
+ {
+ Log(LOG_DEBUG) << "OnFindChan: " << ex.GetReason();
+ }
+ }
+
+ void OnFindNick(const Anope::string &nick)
+ {
+ if (nickinfoupdater.Check(nick))
+ return;
+
+ try
+ {
+ Anope::string query = "SELECT * FROM `anope_ns_alias` WHERE `nick` = '" + this->Escape(nick) + "'";
+ CommandMutex *current_command = this->CurrentCommand();
+ if (current_command)
+ {
+ current_command->Unlock();
+ try
+ {
+ SQLResult res = this->RunQuery(query);
+ current_command->Lock();
+ NickInfoUpdater::Process(res);
+ }
+ catch (const SQLException &)
+ {
+ current_command->Lock();
+ throw;
+ }
+ }
+ else
+ this->RunQuery(&nickinfoupdater, query);
+ }
+ catch (const SQLException &ex)
+ {
+ Log(LOG_DEBUG) << "OnFindNick: " << ex.GetReason();
+ }
+ }
+
+ void OnFindCore(const Anope::string &nick)
+ {
+ if (nickcoreupdater.Check(nick))
+ return;
+
+ try
+ {
+ Anope::string query = "SELECT * FROM `anope_ns_core` WHERE `display` = '" + this->Escape(nick) + "'";
+ CommandMutex *current_command = this->CurrentCommand();
+ if (current_command)
+ {
+ current_command->Unlock();
+ try
+ {
+ SQLResult res = this->RunQuery(query);
+ current_command->Lock();
+ NickCoreUpdater::Process(res);
+ }
+ catch (const SQLException &)
+ {
+ current_command->Lock();
+ throw;
+ }
+ }
+ else
+ this->RunQuery(&nickcoreupdater, query);
+ }
+ catch (const SQLException &ex)
+ {
+ Log(LOG_DEBUG) << "OnFindCore: " << ex.GetReason();
+ }
+ }
+};
+
MODULE_INIT(MySQLLiveModule)
diff --git a/modules/extra/m_async_commands.cpp b/modules/extra/m_async_commands.cpp
index ddc660ea8..4f251431c 100644
--- a/modules/extra/m_async_commands.cpp
+++ b/modules/extra/m_async_commands.cpp
@@ -9,11 +9,15 @@ static Mutex main_mutex;
class AsynchCommandMutex : public CommandMutex
{
+ bool destroy;
public:
- AsynchCommandMutex() : CommandMutex()
+ bool started;
+
+ AsynchCommandMutex(CommandSource &s, Command *c, const std::vector<Anope::string> &p) : CommandMutex(s, c, p), destroy(false), started(false)
{
commands.push_back(this);
- current_command = this;
+
+ this->mutex.Lock();
}
~AsynchCommandMutex()
@@ -27,6 +31,8 @@ class AsynchCommandMutex : public CommandMutex
void Run()
{
+ this->started = true;
+
User *u = this->source.u;
BotInfo *bi = this->source.owner;
@@ -43,6 +49,8 @@ class AsynchCommandMutex : public CommandMutex
{
FOREACH_MOD(I_OnPostCommand, OnPostCommand(source, command, params));
}
+
+ source.DoReply();
}
main_mutex.Unlock();
@@ -50,6 +58,11 @@ class AsynchCommandMutex : public CommandMutex
void Lock()
{
+ if (this->destroy)
+ {
+ this->Exit();
+ }
+
this->processing = true;
me->Notify();
this->mutex.Lock();
@@ -60,13 +73,34 @@ class AsynchCommandMutex : public CommandMutex
this->processing = false;
main_mutex.Unlock();
}
+
+ void Destroy()
+ {
+ this->destroy = true;
+ }
};
class ModuleAsynchCommands : public Module, public Pipe, public AsynchCommandsService
{
+ bool reset;
+
+ void Reset()
+ {
+ this->reset = false;
+
+ unsigned count = 0, size = commands.size();
+ for (std::list<CommandMutex *>::iterator it = commands.begin(); count < size; ++count, ++it)
+ {
+ AsynchCommandMutex *cm = debug_cast<AsynchCommandMutex *>(*it);
+ cm->Destroy();
+
+ new AsynchCommandMutex(cm->source, cm->command, cm->params);
+ }
+ }
+
public:
- ModuleAsynchCommands(const Anope::string &modname, const Anope::string &creator) : Module(modname, creator), Pipe(), AsynchCommandsService(this, "asynch_commands")
+ ModuleAsynchCommands(const Anope::string &modname, const Anope::string &creator) : Module(modname, creator), Pipe(), AsynchCommandsService(this, "asynch_commands"), reset(false)
{
me = this;
@@ -74,26 +108,40 @@ class ModuleAsynchCommands : public Module, public Pipe, public AsynchCommandsSe
main_mutex.Lock();
- Implementation i[] = { I_OnPreCommand };
- ModuleManager::Attach(i, this, 1);
+ Implementation i[] = { I_OnObjectDestroy, I_OnPreCommand };
+ ModuleManager::Attach(i, this, 2);
ModuleManager::RegisterService(this);
}
+
+ void OnObjectDestroy(Base *b)
+ {
+ for (std::list<CommandMutex *>::iterator it = commands.begin(), it_end = commands.end(); it != it_end; ++it)
+ {
+ AsynchCommandMutex *cm = debug_cast<AsynchCommandMutex *>(*it);
+
+ if (cm->started && (cm->command == b || cm->source.u == b || cm->source.owner == b || cm->source.service == b || cm->source.ci == b))
+ cm->Destroy();
+ }
+
+ if (current_command == NULL)
+ this->Reset();
+ else
+ this->reset = true;
+ }
EventReturn OnPreCommand(CommandSource &source, Command *command, const std::vector<Anope::string> &params)
{
- AsynchCommandMutex *cm = new AsynchCommandMutex();
+ AsynchCommandMutex *cm = new AsynchCommandMutex(source, command, params);
+
try
{
- cm->mutex.Lock();
- cm->command = command;
- cm->source = source;
- cm->params = params;
-
// Give processing to the command thread
Log(LOG_DEBUG_2) << "Waiting for command thread " << cm->command->name << " from " << source.u->nick;
+ current_command = cm;
threadEngine.Start(cm);
main_mutex.Lock();
+ current_command = NULL;
return EVENT_STOP;
}
@@ -108,9 +156,9 @@ class ModuleAsynchCommands : public Module, public Pipe, public AsynchCommandsSe
void OnNotify()
{
- for (std::list<CommandMutex *>::iterator it = commands.begin(), it_end = commands.end(); it != it_end; ++it)
+ for (std::list<CommandMutex *>::iterator it = commands.begin(), it_end = commands.end(); it != it_end;)
{
- CommandMutex *cm = *it;
+ AsynchCommandMutex *cm = debug_cast<AsynchCommandMutex *>(*it++);
// Thread engine will pick this up later
if (cm->GetExitState() || !cm->processing)
@@ -120,11 +168,30 @@ class ModuleAsynchCommands : public Module, public Pipe, public AsynchCommandsSe
current_command = cm;
// Unlock to give processing back to the command thread
- cm->mutex.Unlock();
+ if (!cm->started)
+ {
+ try
+ {
+ threadEngine.Start(cm);
+ }
+ catch (const CoreException &)
+ {
+ delete cm;
+ continue;
+ }
+ }
+ else
+ cm->mutex.Unlock();
// Relock to regain processing once the command thread hangs for any reason
main_mutex.Lock();
current_command = NULL;
+
+ if (this->reset)
+ {
+ this->Reset();
+ return this->OnNotify();
+ }
}
}
diff --git a/modules/extra/m_mysql.cpp b/modules/extra/m_mysql.cpp
index 12b4dbcc5..7ef4243e6 100644
--- a/modules/extra/m_mysql.cpp
+++ b/modules/extra/m_mysql.cpp
@@ -50,16 +50,13 @@ class MySQLResult : public SQLResult
public:
MySQLResult(const Anope::string &q, MYSQL_RES *r) : SQLResult(q), res(r)
{
- if (!res)
- return;
+ unsigned num_fields = res ? mysql_num_fields(res) : 0;
- unsigned num_fields = mysql_num_fields(res);
+ Log(LOG_DEBUG) << "SQL query " << q << " returned " << num_fields << " fields";
if (!num_fields)
return;
- Log(LOG_DEBUG) << "SQL query returned " << num_fields << " fields";
-
for (MYSQL_ROW row; (row = mysql_fetch_row(res));)
{
MYSQL_FIELD *fields = mysql_fetch_fields(res);
@@ -268,6 +265,8 @@ class ModuleSQL : public Module
}
this->DThread->Unlock();
+
+ this->SQLPipe->OnNotify();
}
};
@@ -398,8 +397,11 @@ void DispatcherThread::Run()
void MySQLPipe::OnNotify()
{
me->DThread->Lock();
+ std::deque<QueryResult> finishedRequests = me->FinishedRequests;
+ me->FinishedRequests.clear();
+ me->DThread->Unlock();
- for (std::deque<QueryResult>::const_iterator it = me->FinishedRequests.begin(), it_end = me->FinishedRequests.end(); it != it_end; ++it)
+ for (std::deque<QueryResult>::const_iterator it = finishedRequests.begin(), it_end = finishedRequests.end(); it != it_end; ++it)
{
const QueryResult &qr = *it;
@@ -411,9 +413,6 @@ void MySQLPipe::OnNotify()
else
qr.sqlinterface->OnError(qr.result);
}
- me->FinishedRequests.clear();
-
- me->DThread->Unlock();
}
MODULE_INIT(ModuleSQL)