diff options
Diffstat (limited to 'modules')
-rw-r--r-- | modules/extra/async_commands.h | 4 | ||||
-rw-r--r-- | modules/extra/db_mysql_live.cpp | 355 | ||||
-rw-r--r-- | modules/extra/m_async_commands.cpp | 95 | ||||
-rw-r--r-- | modules/extra/m_mysql.cpp | 17 |
4 files changed, 358 insertions, 113 deletions
diff --git a/modules/extra/async_commands.h b/modules/extra/async_commands.h index b6b9e7c2b..a2f650bae 100644 --- a/modules/extra/async_commands.h +++ b/modules/extra/async_commands.h @@ -6,11 +6,11 @@ class CommandMutex : public Thread Mutex mutex; // Set to true when this thread is processing data that is not thread safe (eg, the command) bool processing; - Command *command; CommandSource source; + Command *command; std::vector<Anope::string> params; - CommandMutex() : Thread(), processing(true) { } + CommandMutex(CommandSource &s, Command *c, const std::vector<Anope::string> &p) : Thread(), processing(true), source(s), command(c), params(p) { } ~CommandMutex() { } diff --git a/modules/extra/db_mysql_live.cpp b/modules/extra/db_mysql_live.cpp index 8f14f3998..c5e645be6 100644 --- a/modules/extra/db_mysql_live.cpp +++ b/modules/extra/db_mysql_live.cpp @@ -2,59 +2,51 @@ #include "async_commands.h" #include "sql.h" -class MySQLLiveModule : public Module +class SQLCache : public Timer { - service_reference<SQLProvider> SQL; - service_reference<AsynchCommandsService> ACS; + typedef std::map<Anope::string, time_t, std::less<ci::string> > cache_map; + cache_map cache; + public: - SQLResult RunQuery(const Anope::string &query) + SQLCache() : Timer(300, Anope::CurTime, true) { } + + bool Check(const Anope::string &item) { - if (!this->SQL) - throw SQLException("Unable to locate SQL reference, is m_mysql loaded and configured correctly?"); + cache_map::iterator it = this->cache.find(item); + if (it != this->cache.end() && Anope::CurTime - it->second < 5) + return true; - return SQL->RunQuery(query); + this->cache[item] = Anope::CurTime; + return false; } - const Anope::string Escape(const Anope::string &query) + void Tick(time_t) { - return SQL ? SQL->Escape(query) : query; - } + for (cache_map::iterator it = this->cache.begin(), next_it; it != this->cache.end(); it = next_it) + { + next_it = it; + ++next_it; - CommandMutex *CurrentCommand() - { - if (this->ACS) - return this->ACS->CurrentCommand(); - return NULL; + if (Anope::CurTime - it->second > 5) + this->cache.erase(it); + } } +}; +class BotInfoUpdater : public SQLInterface, public SQLCache +{ public: - MySQLLiveModule(const Anope::string &modname, const Anope::string &creator) : - Module(modname, creator), SQL("mysql/main"), ACS("asynch_commands") + BotInfoUpdater(Module *m) : SQLInterface(m) { } + + void OnResult(const SQLResult &r) { - Implementation i[] = { I_OnFindBot, I_OnFindChan, I_OnFindNick, I_OnFindCore }; - ModuleManager::Attach(i, this, 4); + BotInfoUpdater::Process(r); } - void OnFindBot(const Anope::string &nick) + static void Process(const SQLResult &res) { - static bool lookup = true; - if (lookup == false) - { - lookup = true; - return; - } - try { - CommandMutex *current_command = this->CurrentCommand(); - - if (current_command) - current_command->Unlock(); - SQLResult res = this->RunQuery("SELECT * FROM `anope_bs_core` WHERE `nick` = '" + this->Escape(nick) + "'"); - if (current_command) - current_command->Lock(); - - lookup = false; BotInfo *bi = findbot(res.Get(0, "nick")); if (!bi) bi = new BotInfo(res.Get(0, "nick"), res.Get(0, "user"), res.Get(0, "host"), res.Get(0, "rname")); @@ -70,30 +62,28 @@ class MySQLLiveModule : public Module bi->created = convertTo<time_t>(res.Get(0, "created")); bi->chancount = convertTo<uint32>(res.Get(0, "chancount")); } - catch (const SQLException &) { } + catch (const SQLException &ex) + { + Log(LOG_DEBUG) << ex.GetReason(); + } catch (const ConvertException &) { } } +}; - void OnFindChan(const Anope::string &chname) +class ChanInfoUpdater : public SQLInterface, public SQLCache +{ + public: + ChanInfoUpdater(Module *m) : SQLInterface(m) { } + + void OnResult(const SQLResult &r) { - static bool lookup = true; - if (lookup == false) - { - lookup = true; - return; - } + ChanInfoUpdater::Process(r); + } + static void Process(const SQLResult &res) + { try { - CommandMutex *current_command = this->CurrentCommand(); - - if (current_command) - current_command->Unlock(); - SQLResult res = this->RunQuery("SELECT * FROM `anope_cs_info` WHERE `name` = '" + this->Escape(chname) + "'"); - if (current_command) - current_command->Lock(); - - lookup = false; ChannelInfo *ci = cs_findchan(res.Get(0, "name")); if (!ci) ci = new ChannelInfo(res.Get(0, "name")); @@ -159,33 +149,31 @@ class MySQLLiveModule : public Module if (ci->c) check_modes(ci->c); } - catch (const SQLException &) { } + catch (const SQLException &ex) + { + Log(LOG_DEBUG) << ex.GetReason(); + } catch (const ConvertException &) { } } +}; - void OnFindNick(const Anope::string &nick) +class NickInfoUpdater : public SQLInterface, public SQLCache +{ + public: + NickInfoUpdater(Module *m) : SQLInterface(m) { } + + void OnResult(const SQLResult &r) { - static bool lookup = true; - if (lookup == false) - { - lookup = true; - return; - } + NickInfoUpdater::Process(r); + } + static void Process(const SQLResult &res) + { try { - CommandMutex *current_command = this->CurrentCommand(); - - if (current_command) - current_command->Unlock(); - SQLResult res = this->RunQuery("SELECT * FROM `anope_ns_alias` WHERE `nick` = '" + this->Escape(nick) + "'"); - if (current_command) - current_command->Lock(); - NickCore *nc = findcore(res.Get(0, "display")); if (!nc) return; - lookup = false; NickAlias *na = findnick(res.Get(0, "nick")); if (!na) na = new NickAlias(res.Get(0, "nick"), nc); @@ -208,30 +196,28 @@ class MySQLLiveModule : public Module na->nc->aliases.push_back(na); } } - catch (const SQLException &) { } + catch (const SQLException &ex) + { + Log(LOG_DEBUG) << ex.GetReason(); + } catch (const ConvertException &) { } } +}; - void OnFindCore(const Anope::string &nick) +class NickCoreUpdater : public SQLInterface, public SQLCache +{ + public: + NickCoreUpdater(Module *m) : SQLInterface(m) { } + + void OnResult(const SQLResult &r) { - static bool lookup = true; - if (lookup == false) - { - lookup = true; - return; - } + NickCoreUpdater::Process(r); + } + static void Process(const SQLResult &res) + { try { - CommandMutex *current_command = this->CurrentCommand(); - - if (current_command) - current_command->Unlock(); - SQLResult res = this->RunQuery("SELECT * FROM `anope_ns_core` WHERE `name` = '" + this->Escape(nick) + "'"); - if (current_command) - current_command->Lock(); - - lookup = false; NickCore *nc = findcore(res.Get(0, "display")); if (!nc) nc = new NickCore(res.Get(0, "display")); @@ -243,9 +229,202 @@ class MySQLLiveModule : public Module nc->FromString(BuildStringVector(res.Get(0, "flags"))); nc->language = res.Get(0, "language"); } - catch (const SQLException &) { } + catch (const SQLException &ex) + { + Log(LOG_DEBUG) << ex.GetReason(); + } catch (const ConvertException &) { } } }; +class MySQLLiveModule : public Module +{ + service_reference<SQLProvider> SQL; + service_reference<AsynchCommandsService> ACS; + + BotInfoUpdater botinfoupdater; + ChanInfoUpdater chaninfoupdater; + NickInfoUpdater nickinfoupdater; + NickCoreUpdater nickcoreupdater; + + SQLResult RunQuery(const Anope::string &query) + { + if (!this->SQL) + throw SQLException("Unable to locate SQL reference, is m_mysql loaded and configured correctly?"); + + SQLResult res = SQL->RunQuery(query); + if (!res.GetError().empty()) + throw SQLException(res.GetError()); + return res; + } + + void RunQuery(SQLInterface *i, const Anope::string &query) + { + if (!this->SQL) + throw SQLException("Unable to locate SQL reference, is m_mysql loaded and configured correctly?"); + + return SQL->Run(i, query); + } + + const Anope::string Escape(const Anope::string &query) + { + return SQL ? SQL->Escape(query) : query; + } + + CommandMutex *CurrentCommand() + { + if (this->ACS) + return this->ACS->CurrentCommand(); + return NULL; + } + + public: + MySQLLiveModule(const Anope::string &modname, const Anope::string &creator) : + Module(modname, creator), SQL("mysql/main"), ACS("asynch_commands"), botinfoupdater(this), + chaninfoupdater(this), nickinfoupdater(this), nickcoreupdater(this) + { + Implementation i[] = { I_OnFindBot, I_OnFindChan, I_OnFindNick, I_OnFindCore, I_OnPreShutdown }; + ModuleManager::Attach(i, this, 5); + } + + void OnPreShutdown() + { + Implementation i[] = { I_OnFindBot, I_OnFindChan, I_OnFindNick, I_OnFindCore }; + for (size_t j = 0; j < 4; ++j) + ModuleManager::Detach(i[j], this); + } + + void OnFindBot(const Anope::string &nick) + { + if (botinfoupdater.Check(nick)) + return; + + try + { + Anope::string query = "SELECT * FROM `anope_bs_core` WHERE `nick` = '" + this->Escape(nick) + "'"; + CommandMutex *current_command = this->CurrentCommand(); + if (current_command) + { + current_command->Unlock(); + try + { + SQLResult res = this->RunQuery(query); + current_command->Lock(); + BotInfoUpdater::Process(res); + } + catch (const SQLException &ex) + { + current_command->Lock(); + throw; + } + } + else + this->RunQuery(&botinfoupdater, query); + } + catch (const SQLException &ex) + { + Log(LOG_DEBUG) << "OnBotChan: " << ex.GetReason(); + } + } + + void OnFindChan(const Anope::string &chname) + { + if (chaninfoupdater.Check(chname)) + return; + + try + { + Anope::string query = "SELECT * FROM `anope_cs_info` WHERE `name` = '" + this->Escape(chname) + "'"; + CommandMutex *current_command = this->CurrentCommand(); + if (current_command) + { + current_command->Unlock(); + try + { + SQLResult res = this->RunQuery(query); + current_command->Lock(); + ChanInfoUpdater::Process(res); + } + catch (const SQLException &) + { + current_command->Lock(); + throw; + } + } + else + this->RunQuery(&chaninfoupdater, query); + } + catch (const SQLException &ex) + { + Log(LOG_DEBUG) << "OnFindChan: " << ex.GetReason(); + } + } + + void OnFindNick(const Anope::string &nick) + { + if (nickinfoupdater.Check(nick)) + return; + + try + { + Anope::string query = "SELECT * FROM `anope_ns_alias` WHERE `nick` = '" + this->Escape(nick) + "'"; + CommandMutex *current_command = this->CurrentCommand(); + if (current_command) + { + current_command->Unlock(); + try + { + SQLResult res = this->RunQuery(query); + current_command->Lock(); + NickInfoUpdater::Process(res); + } + catch (const SQLException &) + { + current_command->Lock(); + throw; + } + } + else + this->RunQuery(&nickinfoupdater, query); + } + catch (const SQLException &ex) + { + Log(LOG_DEBUG) << "OnFindNick: " << ex.GetReason(); + } + } + + void OnFindCore(const Anope::string &nick) + { + if (nickcoreupdater.Check(nick)) + return; + + try + { + Anope::string query = "SELECT * FROM `anope_ns_core` WHERE `display` = '" + this->Escape(nick) + "'"; + CommandMutex *current_command = this->CurrentCommand(); + if (current_command) + { + current_command->Unlock(); + try + { + SQLResult res = this->RunQuery(query); + current_command->Lock(); + NickCoreUpdater::Process(res); + } + catch (const SQLException &) + { + current_command->Lock(); + throw; + } + } + else + this->RunQuery(&nickcoreupdater, query); + } + catch (const SQLException &ex) + { + Log(LOG_DEBUG) << "OnFindCore: " << ex.GetReason(); + } + } +}; + MODULE_INIT(MySQLLiveModule) diff --git a/modules/extra/m_async_commands.cpp b/modules/extra/m_async_commands.cpp index ddc660ea8..4f251431c 100644 --- a/modules/extra/m_async_commands.cpp +++ b/modules/extra/m_async_commands.cpp @@ -9,11 +9,15 @@ static Mutex main_mutex; class AsynchCommandMutex : public CommandMutex { + bool destroy; public: - AsynchCommandMutex() : CommandMutex() + bool started; + + AsynchCommandMutex(CommandSource &s, Command *c, const std::vector<Anope::string> &p) : CommandMutex(s, c, p), destroy(false), started(false) { commands.push_back(this); - current_command = this; + + this->mutex.Lock(); } ~AsynchCommandMutex() @@ -27,6 +31,8 @@ class AsynchCommandMutex : public CommandMutex void Run() { + this->started = true; + User *u = this->source.u; BotInfo *bi = this->source.owner; @@ -43,6 +49,8 @@ class AsynchCommandMutex : public CommandMutex { FOREACH_MOD(I_OnPostCommand, OnPostCommand(source, command, params)); } + + source.DoReply(); } main_mutex.Unlock(); @@ -50,6 +58,11 @@ class AsynchCommandMutex : public CommandMutex void Lock() { + if (this->destroy) + { + this->Exit(); + } + this->processing = true; me->Notify(); this->mutex.Lock(); @@ -60,13 +73,34 @@ class AsynchCommandMutex : public CommandMutex this->processing = false; main_mutex.Unlock(); } + + void Destroy() + { + this->destroy = true; + } }; class ModuleAsynchCommands : public Module, public Pipe, public AsynchCommandsService { + bool reset; + + void Reset() + { + this->reset = false; + + unsigned count = 0, size = commands.size(); + for (std::list<CommandMutex *>::iterator it = commands.begin(); count < size; ++count, ++it) + { + AsynchCommandMutex *cm = debug_cast<AsynchCommandMutex *>(*it); + cm->Destroy(); + + new AsynchCommandMutex(cm->source, cm->command, cm->params); + } + } + public: - ModuleAsynchCommands(const Anope::string &modname, const Anope::string &creator) : Module(modname, creator), Pipe(), AsynchCommandsService(this, "asynch_commands") + ModuleAsynchCommands(const Anope::string &modname, const Anope::string &creator) : Module(modname, creator), Pipe(), AsynchCommandsService(this, "asynch_commands"), reset(false) { me = this; @@ -74,26 +108,40 @@ class ModuleAsynchCommands : public Module, public Pipe, public AsynchCommandsSe main_mutex.Lock(); - Implementation i[] = { I_OnPreCommand }; - ModuleManager::Attach(i, this, 1); + Implementation i[] = { I_OnObjectDestroy, I_OnPreCommand }; + ModuleManager::Attach(i, this, 2); ModuleManager::RegisterService(this); } + + void OnObjectDestroy(Base *b) + { + for (std::list<CommandMutex *>::iterator it = commands.begin(), it_end = commands.end(); it != it_end; ++it) + { + AsynchCommandMutex *cm = debug_cast<AsynchCommandMutex *>(*it); + + if (cm->started && (cm->command == b || cm->source.u == b || cm->source.owner == b || cm->source.service == b || cm->source.ci == b)) + cm->Destroy(); + } + + if (current_command == NULL) + this->Reset(); + else + this->reset = true; + } EventReturn OnPreCommand(CommandSource &source, Command *command, const std::vector<Anope::string> ¶ms) { - AsynchCommandMutex *cm = new AsynchCommandMutex(); + AsynchCommandMutex *cm = new AsynchCommandMutex(source, command, params); + try { - cm->mutex.Lock(); - cm->command = command; - cm->source = source; - cm->params = params; - // Give processing to the command thread Log(LOG_DEBUG_2) << "Waiting for command thread " << cm->command->name << " from " << source.u->nick; + current_command = cm; threadEngine.Start(cm); main_mutex.Lock(); + current_command = NULL; return EVENT_STOP; } @@ -108,9 +156,9 @@ class ModuleAsynchCommands : public Module, public Pipe, public AsynchCommandsSe void OnNotify() { - for (std::list<CommandMutex *>::iterator it = commands.begin(), it_end = commands.end(); it != it_end; ++it) + for (std::list<CommandMutex *>::iterator it = commands.begin(), it_end = commands.end(); it != it_end;) { - CommandMutex *cm = *it; + AsynchCommandMutex *cm = debug_cast<AsynchCommandMutex *>(*it++); // Thread engine will pick this up later if (cm->GetExitState() || !cm->processing) @@ -120,11 +168,30 @@ class ModuleAsynchCommands : public Module, public Pipe, public AsynchCommandsSe current_command = cm; // Unlock to give processing back to the command thread - cm->mutex.Unlock(); + if (!cm->started) + { + try + { + threadEngine.Start(cm); + } + catch (const CoreException &) + { + delete cm; + continue; + } + } + else + cm->mutex.Unlock(); // Relock to regain processing once the command thread hangs for any reason main_mutex.Lock(); current_command = NULL; + + if (this->reset) + { + this->Reset(); + return this->OnNotify(); + } } } diff --git a/modules/extra/m_mysql.cpp b/modules/extra/m_mysql.cpp index 12b4dbcc5..7ef4243e6 100644 --- a/modules/extra/m_mysql.cpp +++ b/modules/extra/m_mysql.cpp @@ -50,16 +50,13 @@ class MySQLResult : public SQLResult public: MySQLResult(const Anope::string &q, MYSQL_RES *r) : SQLResult(q), res(r) { - if (!res) - return; + unsigned num_fields = res ? mysql_num_fields(res) : 0; - unsigned num_fields = mysql_num_fields(res); + Log(LOG_DEBUG) << "SQL query " << q << " returned " << num_fields << " fields"; if (!num_fields) return; - Log(LOG_DEBUG) << "SQL query returned " << num_fields << " fields"; - for (MYSQL_ROW row; (row = mysql_fetch_row(res));) { MYSQL_FIELD *fields = mysql_fetch_fields(res); @@ -268,6 +265,8 @@ class ModuleSQL : public Module } this->DThread->Unlock(); + + this->SQLPipe->OnNotify(); } }; @@ -398,8 +397,11 @@ void DispatcherThread::Run() void MySQLPipe::OnNotify() { me->DThread->Lock(); + std::deque<QueryResult> finishedRequests = me->FinishedRequests; + me->FinishedRequests.clear(); + me->DThread->Unlock(); - for (std::deque<QueryResult>::const_iterator it = me->FinishedRequests.begin(), it_end = me->FinishedRequests.end(); it != it_end; ++it) + for (std::deque<QueryResult>::const_iterator it = finishedRequests.begin(), it_end = finishedRequests.end(); it != it_end; ++it) { const QueryResult &qr = *it; @@ -411,9 +413,6 @@ void MySQLPipe::OnNotify() else qr.sqlinterface->OnError(qr.result); } - me->FinishedRequests.clear(); - - me->DThread->Unlock(); } MODULE_INIT(ModuleSQL) |