summaryrefslogtreecommitdiff
path: root/modules/extra/m_async_commands.cpp
diff options
context:
space:
mode:
authorAdam <Adam@anope.org>2011-02-20 01:05:16 -0500
committerAdam <Adam@anope.org>2011-02-20 01:05:16 -0500
commitc83b2b73d7c5f264dedb67b878d116b5b10a4742 (patch)
tree407251a2bb3bf738194b6ec7654b0872b6bab1d5 /modules/extra/m_async_commands.cpp
parentdfbb5264fac5b418da536cc968aed4bf5cde8b76 (diff)
Much more work on the live SQL. Should work pretty decently now under heavy load.
Diffstat (limited to 'modules/extra/m_async_commands.cpp')
-rw-r--r--modules/extra/m_async_commands.cpp95
1 files changed, 81 insertions, 14 deletions
diff --git a/modules/extra/m_async_commands.cpp b/modules/extra/m_async_commands.cpp
index ddc660ea8..4f251431c 100644
--- a/modules/extra/m_async_commands.cpp
+++ b/modules/extra/m_async_commands.cpp
@@ -9,11 +9,15 @@ static Mutex main_mutex;
class AsynchCommandMutex : public CommandMutex
{
+ bool destroy;
public:
- AsynchCommandMutex() : CommandMutex()
+ bool started;
+
+ AsynchCommandMutex(CommandSource &s, Command *c, const std::vector<Anope::string> &p) : CommandMutex(s, c, p), destroy(false), started(false)
{
commands.push_back(this);
- current_command = this;
+
+ this->mutex.Lock();
}
~AsynchCommandMutex()
@@ -27,6 +31,8 @@ class AsynchCommandMutex : public CommandMutex
void Run()
{
+ this->started = true;
+
User *u = this->source.u;
BotInfo *bi = this->source.owner;
@@ -43,6 +49,8 @@ class AsynchCommandMutex : public CommandMutex
{
FOREACH_MOD(I_OnPostCommand, OnPostCommand(source, command, params));
}
+
+ source.DoReply();
}
main_mutex.Unlock();
@@ -50,6 +58,11 @@ class AsynchCommandMutex : public CommandMutex
void Lock()
{
+ if (this->destroy)
+ {
+ this->Exit();
+ }
+
this->processing = true;
me->Notify();
this->mutex.Lock();
@@ -60,13 +73,34 @@ class AsynchCommandMutex : public CommandMutex
this->processing = false;
main_mutex.Unlock();
}
+
+ void Destroy()
+ {
+ this->destroy = true;
+ }
};
class ModuleAsynchCommands : public Module, public Pipe, public AsynchCommandsService
{
+ bool reset;
+
+ void Reset()
+ {
+ this->reset = false;
+
+ unsigned count = 0, size = commands.size();
+ for (std::list<CommandMutex *>::iterator it = commands.begin(); count < size; ++count, ++it)
+ {
+ AsynchCommandMutex *cm = debug_cast<AsynchCommandMutex *>(*it);
+ cm->Destroy();
+
+ new AsynchCommandMutex(cm->source, cm->command, cm->params);
+ }
+ }
+
public:
- ModuleAsynchCommands(const Anope::string &modname, const Anope::string &creator) : Module(modname, creator), Pipe(), AsynchCommandsService(this, "asynch_commands")
+ ModuleAsynchCommands(const Anope::string &modname, const Anope::string &creator) : Module(modname, creator), Pipe(), AsynchCommandsService(this, "asynch_commands"), reset(false)
{
me = this;
@@ -74,26 +108,40 @@ class ModuleAsynchCommands : public Module, public Pipe, public AsynchCommandsSe
main_mutex.Lock();
- Implementation i[] = { I_OnPreCommand };
- ModuleManager::Attach(i, this, 1);
+ Implementation i[] = { I_OnObjectDestroy, I_OnPreCommand };
+ ModuleManager::Attach(i, this, 2);
ModuleManager::RegisterService(this);
}
+
+ void OnObjectDestroy(Base *b)
+ {
+ for (std::list<CommandMutex *>::iterator it = commands.begin(), it_end = commands.end(); it != it_end; ++it)
+ {
+ AsynchCommandMutex *cm = debug_cast<AsynchCommandMutex *>(*it);
+
+ if (cm->started && (cm->command == b || cm->source.u == b || cm->source.owner == b || cm->source.service == b || cm->source.ci == b))
+ cm->Destroy();
+ }
+
+ if (current_command == NULL)
+ this->Reset();
+ else
+ this->reset = true;
+ }
EventReturn OnPreCommand(CommandSource &source, Command *command, const std::vector<Anope::string> &params)
{
- AsynchCommandMutex *cm = new AsynchCommandMutex();
+ AsynchCommandMutex *cm = new AsynchCommandMutex(source, command, params);
+
try
{
- cm->mutex.Lock();
- cm->command = command;
- cm->source = source;
- cm->params = params;
-
// Give processing to the command thread
Log(LOG_DEBUG_2) << "Waiting for command thread " << cm->command->name << " from " << source.u->nick;
+ current_command = cm;
threadEngine.Start(cm);
main_mutex.Lock();
+ current_command = NULL;
return EVENT_STOP;
}
@@ -108,9 +156,9 @@ class ModuleAsynchCommands : public Module, public Pipe, public AsynchCommandsSe
void OnNotify()
{
- for (std::list<CommandMutex *>::iterator it = commands.begin(), it_end = commands.end(); it != it_end; ++it)
+ for (std::list<CommandMutex *>::iterator it = commands.begin(), it_end = commands.end(); it != it_end;)
{
- CommandMutex *cm = *it;
+ AsynchCommandMutex *cm = debug_cast<AsynchCommandMutex *>(*it++);
// Thread engine will pick this up later
if (cm->GetExitState() || !cm->processing)
@@ -120,11 +168,30 @@ class ModuleAsynchCommands : public Module, public Pipe, public AsynchCommandsSe
current_command = cm;
// Unlock to give processing back to the command thread
- cm->mutex.Unlock();
+ if (!cm->started)
+ {
+ try
+ {
+ threadEngine.Start(cm);
+ }
+ catch (const CoreException &)
+ {
+ delete cm;
+ continue;
+ }
+ }
+ else
+ cm->mutex.Unlock();
// Relock to regain processing once the command thread hangs for any reason
main_mutex.Lock();
current_command = NULL;
+
+ if (this->reset)
+ {
+ this->Reset();
+ return this->OnNotify();
+ }
}
}