diff options
Diffstat (limited to 'modules/extra/mysql.cpp')
-rw-r--r-- | modules/extra/mysql.cpp | 618 |
1 files changed, 618 insertions, 0 deletions
diff --git a/modules/extra/mysql.cpp b/modules/extra/mysql.cpp new file mode 100644 index 000000000..caa02d8f0 --- /dev/null +++ b/modules/extra/mysql.cpp @@ -0,0 +1,618 @@ +/* + * + * (C) 2010-2024 Anope Team + * Contact us at team@anope.org + * + * Please read COPYING and README for further details. + */ + +/* RequiredLibraries: mysqlclient */ +/* RequiredWindowsLibraries: libmysql */ + +#include "module.h" +#include "modules/sql.h" + +#ifdef WIN32 +# include <mysql.h> +#else +# include <mysql/mysql.h> +#endif + +#include <condition_variable> +#include <mutex> + +using namespace SQL; + +/** Non blocking threaded MySQL API, based loosely from InspIRCd's m_mysql.cpp + * + * This module spawns a single thread that is used to execute blocking MySQL queries. + * When a module requests a query to be executed it is added to a list for the thread + * (which never stops looping and sleeping) to pick up and execute, the result of which + * is inserted in to another queue to be picked up by the main thread. The main thread + * uses Pipe to become notified through the socket engine when there are results waiting + * to be sent back to the modules requesting the query + */ + +class MySQLService; + +/** A query request + */ +struct QueryRequest final +{ + /* The connection to the database */ + MySQLService *service; + /* The interface to use once we have the result to send the data back */ + Interface *sqlinterface; + /* The actual query */ + Query query; + + QueryRequest(MySQLService *s, Interface *i, const Query &q) : service(s), sqlinterface(i), query(q) { } +}; + +/** A query result */ +struct QueryResult final +{ + /* The interface to send the data back on */ + Interface *sqlinterface; + /* The result */ + Result result; + + QueryResult(Interface *i, Result &r) : sqlinterface(i), result(r) { } +}; + +/** A MySQL result + */ +class MySQLResult final + : public Result +{ + MYSQL_RES *res = nullptr; + +public: + MySQLResult(unsigned int i, const Query &q, const Anope::string &fq, MYSQL_RES *r) : Result(i, q, fq), res(r) + { + unsigned num_fields = res ? mysql_num_fields(res) : 0; + + /* It is not thread safe to log anything here using Log(this->owner) now :( */ + + if (!num_fields) + return; + + for (MYSQL_ROW row; (row = mysql_fetch_row(res));) + { + MYSQL_FIELD *fields = mysql_fetch_fields(res); + + if (fields) + { + std::map<Anope::string, Anope::string> items; + + for (unsigned field_count = 0; field_count < num_fields; ++field_count) + { + Anope::string column = (fields[field_count].name ? fields[field_count].name : ""); + Anope::string data = (row[field_count] ? row[field_count] : ""); + + items[column] = data; + } + + this->entries.push_back(items); + } + } + } + + MySQLResult(const Query &q, const Anope::string &fq, const Anope::string &err) : Result(0, q, fq, err) + { + } + + ~MySQLResult() + { + if (this->res) + mysql_free_result(this->res); + } +}; + +/** A MySQL connection, there can be multiple + */ +class MySQLService final + : public Provider +{ + std::map<Anope::string, std::set<Anope::string> > active_schema; + + Anope::string database; + Anope::string server; + Anope::string user; + Anope::string password; + unsigned int port; + Anope::string socket; + + MYSQL *sql = nullptr; + + /** Escape a query. + * Note the mutex must be held! + */ + Anope::string Escape(const Anope::string &query); + +public: + /* Locked by the SQL thread when a query is pending on this database, + * prevents us from deleting a connection while a query is executing + * in the thread + */ + std::mutex Lock; + + MySQLService(Module *o, const Anope::string &n, const Anope::string &d, const Anope::string &s, const Anope::string &u, const Anope::string &p, unsigned int po, const Anope::string &so); + + ~MySQLService(); + + void Run(Interface *i, const Query &query) override; + + Result RunQuery(const Query &query) override; + + std::vector<Query> CreateTable(const Anope::string &table, const Data &data) override; + + Query BuildInsert(const Anope::string &table, unsigned int id, Data &data) override; + + Query GetTables(const Anope::string &prefix) override; + + void Connect(); + + bool CheckConnection(); + + Anope::string BuildQuery(const Query &q); + + Anope::string FromUnixtime(time_t) override; + + Anope::string GetColumnType(Serialize::DataType dt) + { + switch (dt) + { + case Serialize::DataType::BOOL: + return "TINYINT NOT NULL"; + + case Serialize::DataType::FLOAT: + return "DOUBLE PRECISION NOT NULL"; + + case Serialize::DataType::INT: + return "BIGINT NOT NULL"; + + case Serialize::DataType::TEXT: + return "TEXT"; + + case Serialize::DataType::UINT: + return "BIGINT UNSIGNED NOT NULL"; + } + + return "TEXT"; // Should never be reached + } +}; + +/** The SQL thread used to execute queries + */ +class DispatcherThread final + : public Thread +{ +public: + std::condition_variable_any condvar; + std::mutex mutex; + + DispatcherThread() : Thread() { } + + void Run() override; +}; + +class ModuleSQL; +static ModuleSQL *me; + +class ModuleSQL final + : public Module + , public Pipe +{ + /* SQL connections */ + std::map<Anope::string, MySQLService *> MySQLServices; +public: + /* Pending query requests */ + std::deque<QueryRequest> QueryRequests; + /* Pending finished requests with results */ + std::deque<QueryResult> FinishedRequests; + /* The thread used to execute queries */ + DispatcherThread *DThread; + + ModuleSQL(const Anope::string &modname, const Anope::string &creator) : Module(modname, creator, EXTRA | VENDOR) + { + me = this; + + + DThread = new DispatcherThread(); + DThread->Start(); + } + + ~ModuleSQL() + { + for (std::map<Anope::string, MySQLService *>::iterator it = this->MySQLServices.begin(); it != this->MySQLServices.end(); ++it) + delete it->second; + MySQLServices.clear(); + + DThread->SetExitState(); + DThread->condvar.notify_all(); + DThread->Join(); + delete DThread; + } + + void OnReload(Configuration::Conf *conf) override + { + Configuration::Block *config = conf->GetModule(this); + + for (std::map<Anope::string, MySQLService *>::iterator it = this->MySQLServices.begin(); it != this->MySQLServices.end();) + { + const Anope::string &cname = it->first; + MySQLService *s = it->second; + int i; + + ++it; + + for (i = 0; i < config->CountBlock("mysql"); ++i) + if (config->GetBlock("mysql", i)->Get<const Anope::string>("name", "mysql/main") == cname) + break; + + if (i == config->CountBlock("mysql")) + { + Log(LOG_NORMAL, "mysql") << "MySQL: Removing server connection " << cname; + + delete s; + this->MySQLServices.erase(cname); + } + } + + for (int i = 0; i < config->CountBlock("mysql"); ++i) + { + Configuration::Block *block = config->GetBlock("mysql", i); + const Anope::string &connname = block->Get<const Anope::string>("name", "mysql/main"); + + if (this->MySQLServices.find(connname) == this->MySQLServices.end()) + { + const Anope::string &database = block->Get<const Anope::string>("database", "anope"); + const Anope::string &server = block->Get<const Anope::string>("server", "127.0.0.1"); + const Anope::string &user = block->Get<const Anope::string>("username", "anope"); + const Anope::string &password = block->Get<const Anope::string>("password"); + unsigned int port = block->Get<unsigned int>("port", "3306"); + const Anope::string &socket = block->Get<const Anope::string>("socket"); + + try + { + auto *ss = new MySQLService(this, connname, database, server, user, password, port, socket); + this->MySQLServices.emplace(connname, ss); + + Log(LOG_NORMAL, "mysql") << "MySQL: Successfully connected to server " << connname << " (" << (socket.empty() ? server + ":" + port : socket) << ")"; + } + catch (const SQL::Exception &ex) + { + Log(LOG_NORMAL, "mysql") << "MySQL: " << ex.GetReason(); + } + } + } + } + + void OnModuleUnload(User *, Module *m) override + { + this->DThread->mutex.lock(); + + for (unsigned i = this->QueryRequests.size(); i > 0; --i) + { + QueryRequest &r = this->QueryRequests[i - 1]; + + if (r.sqlinterface && r.sqlinterface->owner == m) + { + if (i == 1) + { + r.service->Lock.lock(); + r.service->Lock.unlock(); + } + + this->QueryRequests.erase(this->QueryRequests.begin() + i - 1); + } + } + + this->DThread->mutex.unlock(); + + this->OnNotify(); + } + + void OnNotify() override + { + this->DThread->mutex.lock(); + std::deque<QueryResult> finishedRequests = this->FinishedRequests; + this->FinishedRequests.clear(); + this->DThread->mutex.unlock(); + + for (const auto &qr : finishedRequests) + { + if (!qr.sqlinterface) + throw SQL::Exception("NULL qr.sqlinterface in MySQLPipe::OnNotify() ?"); + + if (qr.result.GetError().empty()) + qr.sqlinterface->OnResult(qr.result); + else + qr.sqlinterface->OnError(qr.result); + } + } +}; + +MySQLService::MySQLService(Module *o, const Anope::string &n, const Anope::string &d, const Anope::string &s, const Anope::string &u, const Anope::string &p, unsigned int po, const Anope::string &so) + : Provider(o, n) + , database(d) + , server(s) + , user(u) + , password(p) + , port(po) + , socket(so) +{ + Connect(); +} + +MySQLService::~MySQLService() +{ + me->DThread->mutex.lock(); + this->Lock.lock(); + mysql_close(this->sql); + this->sql = NULL; + + for (unsigned i = me->QueryRequests.size(); i > 0; --i) + { + QueryRequest &r = me->QueryRequests[i - 1]; + + if (r.service == this) + { + if (r.sqlinterface) + r.sqlinterface->OnError(Result(0, r.query, "SQL Interface is going away")); + me->QueryRequests.erase(me->QueryRequests.begin() + i - 1); + } + } + this->Lock.unlock(); + me->DThread->mutex.unlock(); +} + +void MySQLService::Run(Interface *i, const Query &query) +{ + me->DThread->mutex.lock(); + me->QueryRequests.push_back(QueryRequest(this, i, query)); + me->DThread->mutex.unlock(); + me->DThread->condvar.notify_all(); +} + +Result MySQLService::RunQuery(const Query &query) +{ + this->Lock.lock(); + + if (!this->CheckConnection()) + { + this->Lock.unlock(); + return MySQLResult(query, query.query, mysql_error(this->sql)); + } + + Anope::string real_query = this->BuildQuery(query); + if (!mysql_real_query(this->sql, real_query.c_str(), real_query.length())) + { + MYSQL_RES *res = mysql_store_result(this->sql); + unsigned int id = mysql_insert_id(this->sql); + + /* because we enabled CLIENT_MULTI_RESULTS in our options + * a multiple statement or a procedure call can return + * multiple result sets. + * we must process them all before the next query. + */ + + while (!mysql_next_result(this->sql)) + mysql_free_result(mysql_store_result(this->sql)); + + this->Lock.unlock(); + return MySQLResult(id, query, real_query, res); + } + else + { + Anope::string error = mysql_error(this->sql); + this->Lock.unlock(); + return MySQLResult(query, real_query, error); + } +} + +std::vector<Query> MySQLService::CreateTable(const Anope::string &table, const Data &data) +{ + std::vector<Query> queries; + std::set<Anope::string> &known_cols = this->active_schema[table]; + + if (known_cols.empty()) + { + Log(LOG_DEBUG) << "mysql: Fetching columns for " << table; + + Result columns = this->RunQuery("SHOW COLUMNS FROM `" + table + "`"); + for (int i = 0; i < columns.Rows(); ++i) + { + const Anope::string &column = columns.Get(i, "Field"); + + Log(LOG_DEBUG) << "mysql: Column #" << i << " for " << table << ": " << column; + known_cols.insert(column); + } + } + + if (known_cols.empty()) + { + Anope::string query_text = "CREATE TABLE `" + table + "` (`id` BIGINT UNSIGNED NOT NULL AUTO_INCREMENT UNIQUE," + " `timestamp` timestamp NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP"; + for (const auto &[column, _] : data.data) + { + known_cols.insert(column); + + query_text += ", `" + column + "` " + GetColumnType(data.GetType(column)); + } + query_text += ", PRIMARY KEY (`id`), KEY `timestamp_idx` (`timestamp`)) ROW_FORMAT=DYNAMIC"; + queries.push_back(query_text); + } + else + { + for (const auto &[column, _] : data.data) + { + if (known_cols.count(column) > 0) + continue; + + known_cols.insert(column); + + Anope::string query_text = "ALTER TABLE `" + table + "` ADD `" + column + "` " + GetColumnType(data.GetType(column)); + + queries.push_back(query_text); + } + } + + return queries; +} + +Query MySQLService::BuildInsert(const Anope::string &table, unsigned int id, Data &data) +{ + /* Empty columns not present in the data set */ + for (const auto &known_col : this->active_schema[table]) + { + if (known_col != "id" && known_col != "timestamp" && data.data.count(known_col) == 0) + data[known_col] << ""; + } + + Anope::string query_text = "INSERT INTO `" + table + "` (`id`"; + + for (const auto &[field, _] : data.data) + query_text += ",`" + field + "`"; + query_text += ") VALUES (" + Anope::ToString(id); + for (const auto &[field, _] : data.data) + query_text += ",@" + field + "@"; + query_text += ") ON DUPLICATE KEY UPDATE "; + for (const auto &[field, _] : data.data) + query_text += "`" + field + "`=VALUES(`" + field + "`),"; + query_text.erase(query_text.end() - 1); + + Query query(query_text); + for (auto &[field, value] : data.data) + { + Anope::string buf; + *value >> buf; + + auto escape = true; + switch (data.GetType(field)) + { + case Serialize::DataType::BOOL: + case Serialize::DataType::FLOAT: + case Serialize::DataType::INT: + case Serialize::DataType::UINT: + { + if (buf.empty()) + buf = "0"; + escape = false; + break; + } + + case Serialize::DataType::TEXT: + { + if (buf.empty()) + { + buf = "NULL"; + escape = false; + } + break; + } + } + + query.SetValue(field, buf, escape); + } + + return query; +} + +Query MySQLService::GetTables(const Anope::string &prefix) +{ + return Query("SHOW TABLES LIKE '" + prefix + "%';"); +} + +void MySQLService::Connect() +{ + this->sql = mysql_init(this->sql); + + const unsigned int timeout = 1; + mysql_options(this->sql, MYSQL_OPT_CONNECT_TIMEOUT, reinterpret_cast<const char *>(&timeout)); + + bool connect = mysql_real_connect(this->sql, this->server.c_str(), this->user.c_str(), this->password.c_str(), this->database.c_str(), this->port, this->socket.empty() ? nullptr : this->socket.c_str(), CLIENT_MULTI_RESULTS); + + if (!connect) + throw SQL::Exception("Unable to connect to MySQL service " + this->name + ": " + mysql_error(this->sql)); + + if (this->socket.empty()) + Log(LOG_DEBUG) << "Successfully connected to MySQL service " << this->name << " at " << this->server << ":" << this->port; + else + Log(LOG_DEBUG) << "Successfully connected to MySQL service " << this->name << " at " << this->socket; +} + + +bool MySQLService::CheckConnection() +{ + if (!this->sql || mysql_ping(this->sql)) + { + try + { + this->Connect(); + } + catch (const SQL::Exception &) + { + return false; + } + } + + return true; +} + +Anope::string MySQLService::Escape(const Anope::string &query) +{ + std::vector<char> buffer(query.length() * 2 + 1); + mysql_real_escape_string(this->sql, &buffer[0], query.c_str(), query.length()); + return &buffer[0]; +} + +Anope::string MySQLService::BuildQuery(const Query &q) +{ + Anope::string real_query = q.query; + + for (const auto &[name, value] : q.parameters) + real_query = real_query.replace_all_cs("@" + name + "@", (value.escape ? ("'" + this->Escape(value.data) + "'") : value.data)); + + return real_query; +} + +Anope::string MySQLService::FromUnixtime(time_t t) +{ + return "FROM_UNIXTIME(" + Anope::ToString(t) + ")"; +} + +void DispatcherThread::Run() +{ + this->mutex.lock(); + + while (!this->GetExitState()) + { + if (!me->QueryRequests.empty()) + { + QueryRequest &r = me->QueryRequests.front(); + this->mutex.unlock(); + + Result sresult = r.service->RunQuery(r.query); + + this->mutex.lock(); + if (!me->QueryRequests.empty() && me->QueryRequests.front().query == r.query) + { + if (r.sqlinterface) + me->FinishedRequests.push_back(QueryResult(r.sqlinterface, sresult)); + me->QueryRequests.pop_front(); + } + } + else + { + if (!me->FinishedRequests.empty()) + me->Notify(); + this->condvar.wait(this->mutex); + } + } + + this->mutex.unlock(); +} + +MODULE_INIT(ModuleSQL) |