diff options
author | Adam <Adam@anope.org> | 2011-02-20 01:05:16 -0500 |
---|---|---|
committer | Adam <Adam@anope.org> | 2011-02-20 01:05:16 -0500 |
commit | c83b2b73d7c5f264dedb67b878d116b5b10a4742 (patch) | |
tree | 407251a2bb3bf738194b6ec7654b0872b6bab1d5 /modules/extra/m_async_commands.cpp | |
parent | dfbb5264fac5b418da536cc968aed4bf5cde8b76 (diff) |
Much more work on the live SQL. Should work pretty decently now under heavy load.
Diffstat (limited to 'modules/extra/m_async_commands.cpp')
-rw-r--r-- | modules/extra/m_async_commands.cpp | 95 |
1 files changed, 81 insertions, 14 deletions
diff --git a/modules/extra/m_async_commands.cpp b/modules/extra/m_async_commands.cpp index ddc660ea8..4f251431c 100644 --- a/modules/extra/m_async_commands.cpp +++ b/modules/extra/m_async_commands.cpp @@ -9,11 +9,15 @@ static Mutex main_mutex; class AsynchCommandMutex : public CommandMutex { + bool destroy; public: - AsynchCommandMutex() : CommandMutex() + bool started; + + AsynchCommandMutex(CommandSource &s, Command *c, const std::vector<Anope::string> &p) : CommandMutex(s, c, p), destroy(false), started(false) { commands.push_back(this); - current_command = this; + + this->mutex.Lock(); } ~AsynchCommandMutex() @@ -27,6 +31,8 @@ class AsynchCommandMutex : public CommandMutex void Run() { + this->started = true; + User *u = this->source.u; BotInfo *bi = this->source.owner; @@ -43,6 +49,8 @@ class AsynchCommandMutex : public CommandMutex { FOREACH_MOD(I_OnPostCommand, OnPostCommand(source, command, params)); } + + source.DoReply(); } main_mutex.Unlock(); @@ -50,6 +58,11 @@ class AsynchCommandMutex : public CommandMutex void Lock() { + if (this->destroy) + { + this->Exit(); + } + this->processing = true; me->Notify(); this->mutex.Lock(); @@ -60,13 +73,34 @@ class AsynchCommandMutex : public CommandMutex this->processing = false; main_mutex.Unlock(); } + + void Destroy() + { + this->destroy = true; + } }; class ModuleAsynchCommands : public Module, public Pipe, public AsynchCommandsService { + bool reset; + + void Reset() + { + this->reset = false; + + unsigned count = 0, size = commands.size(); + for (std::list<CommandMutex *>::iterator it = commands.begin(); count < size; ++count, ++it) + { + AsynchCommandMutex *cm = debug_cast<AsynchCommandMutex *>(*it); + cm->Destroy(); + + new AsynchCommandMutex(cm->source, cm->command, cm->params); + } + } + public: - ModuleAsynchCommands(const Anope::string &modname, const Anope::string &creator) : Module(modname, creator), Pipe(), AsynchCommandsService(this, "asynch_commands") + ModuleAsynchCommands(const Anope::string &modname, const Anope::string &creator) : Module(modname, creator), Pipe(), AsynchCommandsService(this, "asynch_commands"), reset(false) { me = this; @@ -74,26 +108,40 @@ class ModuleAsynchCommands : public Module, public Pipe, public AsynchCommandsSe main_mutex.Lock(); - Implementation i[] = { I_OnPreCommand }; - ModuleManager::Attach(i, this, 1); + Implementation i[] = { I_OnObjectDestroy, I_OnPreCommand }; + ModuleManager::Attach(i, this, 2); ModuleManager::RegisterService(this); } + + void OnObjectDestroy(Base *b) + { + for (std::list<CommandMutex *>::iterator it = commands.begin(), it_end = commands.end(); it != it_end; ++it) + { + AsynchCommandMutex *cm = debug_cast<AsynchCommandMutex *>(*it); + + if (cm->started && (cm->command == b || cm->source.u == b || cm->source.owner == b || cm->source.service == b || cm->source.ci == b)) + cm->Destroy(); + } + + if (current_command == NULL) + this->Reset(); + else + this->reset = true; + } EventReturn OnPreCommand(CommandSource &source, Command *command, const std::vector<Anope::string> ¶ms) { - AsynchCommandMutex *cm = new AsynchCommandMutex(); + AsynchCommandMutex *cm = new AsynchCommandMutex(source, command, params); + try { - cm->mutex.Lock(); - cm->command = command; - cm->source = source; - cm->params = params; - // Give processing to the command thread Log(LOG_DEBUG_2) << "Waiting for command thread " << cm->command->name << " from " << source.u->nick; + current_command = cm; threadEngine.Start(cm); main_mutex.Lock(); + current_command = NULL; return EVENT_STOP; } @@ -108,9 +156,9 @@ class ModuleAsynchCommands : public Module, public Pipe, public AsynchCommandsSe void OnNotify() { - for (std::list<CommandMutex *>::iterator it = commands.begin(), it_end = commands.end(); it != it_end; ++it) + for (std::list<CommandMutex *>::iterator it = commands.begin(), it_end = commands.end(); it != it_end;) { - CommandMutex *cm = *it; + AsynchCommandMutex *cm = debug_cast<AsynchCommandMutex *>(*it++); // Thread engine will pick this up later if (cm->GetExitState() || !cm->processing) @@ -120,11 +168,30 @@ class ModuleAsynchCommands : public Module, public Pipe, public AsynchCommandsSe current_command = cm; // Unlock to give processing back to the command thread - cm->mutex.Unlock(); + if (!cm->started) + { + try + { + threadEngine.Start(cm); + } + catch (const CoreException &) + { + delete cm; + continue; + } + } + else + cm->mutex.Unlock(); // Relock to regain processing once the command thread hangs for any reason main_mutex.Lock(); current_command = NULL; + + if (this->reset) + { + this->Reset(); + return this->OnNotify(); + } } } |