diff options
-rw-r--r-- | include/threadengine.h | 55 | ||||
-rw-r--r-- | modules/extra/ldap.cpp | 55 | ||||
-rw-r--r-- | modules/extra/mysql.cpp | 53 | ||||
-rw-r--r-- | src/threadengine.cpp | 45 |
4 files changed, 59 insertions, 149 deletions
diff --git a/include/threadengine.h b/include/threadengine.h index 79e545166..7849ab89d 100644 --- a/include/threadengine.h +++ b/include/threadengine.h @@ -59,58 +59,3 @@ public: */ virtual void Run() = 0; }; - -class CoreExport Mutex -{ -protected: - /* A mutex, used to keep threads in sync */ - pthread_mutex_t mutex; - -public: - /** Constructor - */ - Mutex(); - - /** Destructor - */ - ~Mutex(); - - /** Attempt to lock the mutex, will hang until a lock can be achieved - */ - void Lock(); - - /** Unlock the mutex, it must be locked first - */ - void Unlock(); - - /** Attempt to lock the mutex, will return true on success and false on fail - * Does not block - * @return true or false - */ - bool TryLock(); -}; - -class CoreExport Condition - : public Mutex -{ -private: - /* A condition */ - pthread_cond_t cond; - -public: - /** Constructor - */ - Condition(); - - /** Destructor - */ - ~Condition(); - - /** Called to wakeup the waiter - */ - void Wakeup(); - - /** Called to wait for a Wakeup() call - */ - void Wait(); -}; diff --git a/modules/extra/ldap.cpp b/modules/extra/ldap.cpp index 31e712662..58ecefd6d 100644 --- a/modules/extra/ldap.cpp +++ b/modules/extra/ldap.cpp @@ -14,6 +14,9 @@ #include "module.h" #include "modules/ldap.h" +#include <condition_variable> +#include <mutex> + #ifdef _WIN32 # include <Winldap.h> # include <WinBer.h> @@ -162,7 +165,6 @@ public: class LDAPService final : public LDAPProvider , public Thread - , public Condition { Anope::string server; Anope::string admin_binddn; @@ -173,6 +175,9 @@ class LDAPService final time_t last_connect = 0; public: + std::condition_variable_any condvar; + std::mutex mutex; + static LDAPMod **BuildMods(const LDAPMods &attributes) { LDAPMod **mods = new LDAPMod*[attributes.size() + 1]; @@ -301,16 +306,16 @@ private: void QueueRequest(LDAPRequest *r) { - this->Lock(); + this->mutex.lock(); this->queries.push_back(r); - this->Wakeup(); - this->Unlock(); + this->condvar.notify_all(); + this->mutex.unlock(); } public: typedef std::vector<LDAPRequest *> query_queue; query_queue queries, results; - Mutex process_mutex; /* held when processing requests not in either queue */ + std::mutex process_mutex; /* held when processing requests not in either queue */ LDAPService(Module *o, const Anope::string &n, const Anope::string &s, const Anope::string &b, const Anope::string &p) : LDAPProvider(o, n), server(s), admin_binddn(b), admin_pass(p) { @@ -321,7 +326,7 @@ public: { /* At this point the thread has stopped so we don't need to hold process_mutex */ - this->Lock(); + this->mutex.lock(); for (auto *req : this->queries) { @@ -346,7 +351,7 @@ public: delete req; } - this->Unlock(); + this->mutex.unlock(); ldap_unbind_ext(this->con, NULL, NULL); } @@ -445,16 +450,16 @@ private: void SendRequests() { - process_mutex.Lock(); + process_mutex.lock(); query_queue q; - this->Lock(); + this->mutex.lock(); queries.swap(q); - this->Unlock(); + this->mutex.unlock(); if (q.empty()) { - process_mutex.Unlock(); + process_mutex.unlock(); return; } @@ -478,14 +483,14 @@ private: BuildReply(ret, req); - this->Lock(); + this->mutex.lock(); results.push_back(req); - this->Unlock(); + this->mutex.unlock(); } me->Notify(); - process_mutex.Unlock(); + process_mutex.unlock(); } public: @@ -493,11 +498,11 @@ public: { while (!this->GetExitState()) { - this->Lock(); + this->mutex.lock(); /* Queries can be non empty if one is pushed during SendRequests() */ if (queries.empty()) - this->Wait(); - this->Unlock(); + this->condvar.wait(this->mutex); + this->mutex.unlock(); SendRequests(); } @@ -527,7 +532,7 @@ public: for (std::map<Anope::string, LDAPService *>::iterator it = this->LDAPServices.begin(); it != this->LDAPServices.end(); ++it) { it->second->SetExitState(); - it->second->Wakeup(); + it->second->condvar.notify_all(); it->second->Join(); delete it->second; } @@ -555,7 +560,7 @@ public: Log(LOG_NORMAL, "ldap") << "LDAP: Removing server connection " << cname; s->SetExitState(); - s->Wakeup(); + s->condvar.notify_all(); s->Join(); delete s; this->LDAPServices.erase(cname); @@ -596,8 +601,8 @@ public: { LDAPService *s = it->second; - s->process_mutex.Lock(); - s->Lock(); + s->process_mutex.lock(); + s->mutex.lock(); for (unsigned int i = s->queries.size(); i > 0; --i) { @@ -622,8 +627,8 @@ public: } } - s->Unlock(); - s->process_mutex.Unlock(); + s->mutex.unlock(); + s->process_mutex.unlock(); } } @@ -634,9 +639,9 @@ public: LDAPService *s = it->second; LDAPService::query_queue results; - s->Lock(); + s->mutex.lock(); results.swap(s->results); - s->Unlock(); + s->mutex.unlock(); for (const auto *req : results) { diff --git a/modules/extra/mysql.cpp b/modules/extra/mysql.cpp index f1baff7b9..49149464f 100644 --- a/modules/extra/mysql.cpp +++ b/modules/extra/mysql.cpp @@ -18,6 +18,9 @@ # include <mysql/mysql.h> #endif +#include <condition_variable> +#include <mutex> + using namespace SQL; /** Non blocking threaded MySQL API, based loosely from InspIRCd's m_mysql.cpp @@ -131,7 +134,7 @@ public: * prevents us from deleting a connection while a query is executing * in the thread */ - Mutex Lock; + std::mutex Lock; MySQLService(Module *o, const Anope::string &n, const Anope::string &d, const Anope::string &s, const Anope::string &u, const Anope::string &p, unsigned int po); @@ -160,9 +163,11 @@ public: */ class DispatcherThread final : public Thread - , public Condition { public: + std::condition_variable_any condvar; + std::mutex mutex; + DispatcherThread() : Thread() { } void Run() override; @@ -201,7 +206,7 @@ public: MySQLServices.clear(); DThread->SetExitState(); - DThread->Wakeup(); + DThread->condvar.notify_all(); DThread->Join(); delete DThread; } @@ -261,7 +266,7 @@ public: void OnModuleUnload(User *, Module *m) override { - this->DThread->Lock(); + this->DThread->mutex.lock(); for (unsigned i = this->QueryRequests.size(); i > 0; --i) { @@ -271,25 +276,25 @@ public: { if (i == 1) { - r.service->Lock.Lock(); - r.service->Lock.Unlock(); + r.service->Lock.lock(); + r.service->Lock.unlock(); } this->QueryRequests.erase(this->QueryRequests.begin() + i - 1); } } - this->DThread->Unlock(); + this->DThread->mutex.unlock(); this->OnNotify(); } void OnNotify() override { - this->DThread->Lock(); + this->DThread->mutex.lock(); std::deque<QueryResult> finishedRequests = this->FinishedRequests; this->FinishedRequests.clear(); - this->DThread->Unlock(); + this->DThread->mutex.unlock(); for (const auto &qr : finishedRequests) { @@ -317,8 +322,8 @@ MySQLService::MySQLService(Module *o, const Anope::string &n, const Anope::strin MySQLService::~MySQLService() { - me->DThread->Lock(); - this->Lock.Lock(); + me->DThread->mutex.lock(); + this->Lock.lock(); mysql_close(this->sql); this->sql = NULL; @@ -333,21 +338,21 @@ MySQLService::~MySQLService() me->QueryRequests.erase(me->QueryRequests.begin() + i - 1); } } - this->Lock.Unlock(); - me->DThread->Unlock(); + this->Lock.unlock(); + me->DThread->mutex.unlock(); } void MySQLService::Run(Interface *i, const Query &query) { - me->DThread->Lock(); + me->DThread->mutex.lock(); me->QueryRequests.push_back(QueryRequest(this, i, query)); - me->DThread->Unlock(); - me->DThread->Wakeup(); + me->DThread->mutex.unlock(); + me->DThread->condvar.notify_all(); } Result MySQLService::RunQuery(const Query &query) { - this->Lock.Lock(); + this->Lock.lock(); Anope::string real_query = this->BuildQuery(query); @@ -365,13 +370,13 @@ Result MySQLService::RunQuery(const Query &query) while (!mysql_next_result(this->sql)) mysql_free_result(mysql_store_result(this->sql)); - this->Lock.Unlock(); + this->Lock.unlock(); return MySQLResult(id, query, real_query, res); } else { Anope::string error = mysql_error(this->sql); - this->Lock.Unlock(); + this->Lock.unlock(); return MySQLResult(query, real_query, error); } } @@ -536,18 +541,18 @@ Anope::string MySQLService::FromUnixtime(time_t t) void DispatcherThread::Run() { - this->Lock(); + this->mutex.lock(); while (!this->GetExitState()) { if (!me->QueryRequests.empty()) { QueryRequest &r = me->QueryRequests.front(); - this->Unlock(); + this->mutex.unlock(); Result sresult = r.service->RunQuery(r.query); - this->Lock(); + this->mutex.lock(); if (!me->QueryRequests.empty() && me->QueryRequests.front().query == r.query) { if (r.sqlinterface) @@ -559,11 +564,11 @@ void DispatcherThread::Run() { if (!me->FinishedRequests.empty()) me->Notify(); - this->Wait(); + this->condvar.wait(this->mutex); } } - this->Unlock(); + this->mutex.unlock(); } MODULE_INIT(ModuleSQL) diff --git a/src/threadengine.cpp b/src/threadengine.cpp index 0ae709db9..5630a99aa 100644 --- a/src/threadengine.cpp +++ b/src/threadengine.cpp @@ -81,48 +81,3 @@ void Thread::OnNotify() this->Join(); this->flags[SF_DEAD] = true; } - -Mutex::Mutex() -{ - pthread_mutex_init(&mutex, NULL); -} - -Mutex::~Mutex() -{ - pthread_mutex_destroy(&mutex); -} - -void Mutex::Lock() -{ - pthread_mutex_lock(&mutex); -} - -void Mutex::Unlock() -{ - pthread_mutex_unlock(&mutex); -} - -bool Mutex::TryLock() -{ - return pthread_mutex_trylock(&mutex) == 0; -} - -Condition::Condition() : Mutex() -{ - pthread_cond_init(&cond, NULL); -} - -Condition::~Condition() -{ - pthread_cond_destroy(&cond); -} - -void Condition::Wakeup() -{ - pthread_cond_signal(&cond); -} - -void Condition::Wait() -{ - pthread_cond_wait(&cond, &mutex); -} |