TrinityCore
Loading...
Searching...
No Matches
DatabaseWorkerPool.cpp
Go to the documentation of this file.
1/*
2 * This file is part of the TrinityCore Project. See AUTHORS file for Copyright information
3 *
4 * This program is free software; you can redistribute it and/or modify it
5 * under the terms of the GNU General Public License as published by the
6 * Free Software Foundation; either version 2 of the License, or (at your
7 * option) any later version.
8 *
9 * This program is distributed in the hope that it will be useful, but WITHOUT
10 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
11 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
12 * more details.
13 *
14 * You should have received a copy of the GNU General Public License along
15 * with this program. If not, see <http://www.gnu.org/licenses/>.
16 */
17
18#include "DatabaseWorkerPool.h"
19#include "AdhocStatement.h"
20#include "Common.h"
21#include "Errors.h"
25#include "Log.h"
27#include "PreparedStatement.h"
29#include "QueryCallback.h"
30#include "QueryHolder.h"
31#include "QueryResult.h"
32#include "SQLOperation.h"
33#include "Transaction.h"
34#include "MySQLWorkaround.h"
35#include <mysqld_error.h>
36#ifdef TRINITY_DEBUG
37#include <sstream>
38#include <boost/stacktrace.hpp>
39#endif
40
41static consteval uint32 ParseVersionString(std::string_view chars)
42{
43 uint32 result = 0;
44 uint32 partialResult = 0;
45 uint32 multiplier = 10000;
46 for (std::size_t i = 0; i < chars.length(); ++i)
47 {
48 char c = chars[i];
49 if (c == '.')
50 {
51 if (multiplier < 100)
52 throw "Too many . characters in version string";
53
54 result += partialResult * multiplier;
55 multiplier /= 100;
56 partialResult = 0;
57 }
58 else if (c >= '0' && c <= '9')
59 {
60 partialResult *= 10;
61 partialResult += c - '0';
62 }
63 else
64 throw "Invalid input character";
65 }
66
67 result += partialResult * multiplier;
68
69 return result;
70}
71
73{
75 bool Execute() override
76 {
77 m_conn->Ping();
78 return true;
79 }
80};
81
82template <class T>
84 : _queue(new ProducerConsumerQueue<SQLOperation*>()),
85 _async_threads(0), _synch_threads(0)
86{
87 // We only need check compiled version match on Windows
88 // because on other platforms ABI compatibility is ensured by SOVERSION
89 // and Windows MySQL releases don't even have abi-version-like component in their dll file name
90#if TRINITY_PLATFORM == TRINITY_PLATFORM_WINDOWS
91#if defined(LIBMARIADB) && MARIADB_PACKAGE_VERSION_ID >= 30200
92#define TRINITY_COMPILED_CLIENT_VERSION MARIADB_PACKAGE_VERSION_ID
93#else
94#define TRINITY_COMPILED_CLIENT_VERSION MYSQL_VERSION_ID
95#endif
96 WPFatal(mysql_get_client_version() == TRINITY_COMPILED_CLIENT_VERSION, "Used " TRINITY_MYSQL_FLAVOR " library version (%s id %lu) does not match the version id used to compile TrinityCore (id %u). Search on forum for TCE00011.", mysql_get_client_info(), mysql_get_client_version(), TRINITY_COMPILED_CLIENT_VERSION);
97#undef TRINITY_COMPILED_CLIENT_VERSION
98#endif
99}
100
101template <class T>
103{
104 _queue->Cancel();
105}
106
107template <class T>
108void DatabaseWorkerPool<T>::SetConnectionInfo(std::string const& infoString,
109 uint8 const asyncThreads, uint8 const synchThreads)
110{
111 _connectionInfo = std::make_unique<MySQLConnectionInfo>(infoString);
112
113 _async_threads = asyncThreads;
114 _synch_threads = synchThreads;
115}
116
117template <class T>
119{
120 WPFatal(_connectionInfo.get(), "Connection info was not set!");
121
122 TC_LOG_INFO("sql.driver", "Opening DatabasePool '{}'. "
123 "Asynchronous connections: {}, synchronous connections: {}.",
124 GetDatabaseName(), _async_threads, _synch_threads);
125
126 uint32 error = OpenConnections(IDX_ASYNC, _async_threads);
127
128 if (error)
129 return error;
130
131 error = OpenConnections(IDX_SYNCH, _synch_threads);
132
133 if (!error)
134 {
135 TC_LOG_INFO("sql.driver", "DatabasePool '{}' opened successfully. "
136 "{} total connections running.", GetDatabaseName(),
137 (_connections[IDX_SYNCH].size() + _connections[IDX_ASYNC].size()));
138 }
139
140 return error;
141}
142
143template <class T>
145{
146 TC_LOG_INFO("sql.driver", "Closing down DatabasePool '{}'.", GetDatabaseName());
147
149 _connections[IDX_ASYNC].clear();
150
151 TC_LOG_INFO("sql.driver", "Asynchronous connections on DatabasePool '{}' terminated. "
152 "Proceeding with synchronous connections.",
153 GetDatabaseName());
154
159 _connections[IDX_SYNCH].clear();
160
161 TC_LOG_INFO("sql.driver", "All connections on DatabasePool '{}' closed.", GetDatabaseName());
162}
163
164template <class T>
166{
167 for (auto& connections : _connections)
168 {
169 for (auto& connection : connections)
170 {
171 connection->LockIfReady();
172 if (!connection->PrepareStatements())
173 {
174 connection->Unlock();
175 Close();
176 return false;
177 }
178 else
179 connection->Unlock();
180
181 size_t const preparedSize = connection->m_stmts.size();
182 if (_preparedStatementSize.size() < preparedSize)
183 _preparedStatementSize.resize(preparedSize);
184
185 for (size_t i = 0; i < preparedSize; ++i)
186 {
187 // already set by another connection
188 // (each connection only has prepared statements of it's own type sync/async)
189 if (_preparedStatementSize[i] > 0)
190 continue;
191
192 if (MySQLPreparedStatement* stmt = connection->m_stmts[i].get())
193 {
194 uint32 const paramCount = stmt->GetParameterCount();
195
196 // TC only supports uint8 indices.
197 ASSERT(paramCount < std::numeric_limits<uint8>::max());
198
199 _preparedStatementSize[i] = static_cast<uint8>(paramCount);
200 }
201 }
202 }
203 }
204
205 return true;
206}
207
208template <class T>
209QueryResult DatabaseWorkerPool<T>::Query(char const* sql, T* connection /*= nullptr*/)
210{
211 if (!connection)
212 connection = GetFreeConnection();
213
214 ResultSet* result = connection->Query(sql);
215 connection->Unlock();
216 if (!result || !result->GetRowCount() || !result->NextRow())
217 {
218 delete result;
219 return QueryResult(nullptr);
220 }
221
222 return QueryResult(result);
223}
224
225template <class T>
227{
228 auto connection = GetFreeConnection();
229 PreparedResultSet* ret = connection->Query(stmt);
230 connection->Unlock();
231
233 delete stmt;
234
235 if (!ret || !ret->GetRowCount())
236 {
237 delete ret;
238 return PreparedQueryResult(nullptr);
239 }
240
241 return PreparedQueryResult(ret);
242}
243
244template <class T>
246{
247 BasicStatementTask* task = new BasicStatementTask(sql, true);
248 // Store future result before enqueueing - task might get already processed and deleted before returning from this method
249 QueryResultFuture result = task->GetFuture();
250 Enqueue(task);
251 return QueryCallback(std::move(result));
252}
253
254template <class T>
256{
257 PreparedStatementTask* task = new PreparedStatementTask(stmt, true);
258 // Store future result before enqueueing - task might get already processed and deleted before returning from this method
259 PreparedQueryResultFuture result = task->GetFuture();
260 Enqueue(task);
261 return QueryCallback(std::move(result));
262}
263
264template <class T>
266{
267 SQLQueryHolderTask* task = new SQLQueryHolderTask(holder);
268 // Store future result before enqueueing - task might get already processed and deleted before returning from this method
269 QueryResultHolderFuture result = task->GetFuture();
270 Enqueue(task);
271 return { std::move(holder), std::move(result) };
272}
273
274template <class T>
276{
277 return std::make_shared<Transaction<T>>();
278}
279
280template <class T>
282{
283#ifdef TRINITY_DEBUG
287 switch (transaction->GetSize())
288 {
289 case 0:
290 TC_LOG_DEBUG("sql.driver", "Transaction contains 0 queries. Not executing.");
291 return;
292 case 1:
293 TC_LOG_DEBUG("sql.driver", "Warning: Transaction only holds 1 query, consider removing Transaction context in code.");
294 break;
295 default:
296 break;
297 }
298#endif // TRINITY_DEBUG
299
300 Enqueue(new TransactionTask(transaction));
301}
302
303template <class T>
305{
306#ifdef TRINITY_DEBUG
310 switch (transaction->GetSize())
311 {
312 case 0:
313 TC_LOG_DEBUG("sql.driver", "Transaction contains 0 queries. Not executing.");
314 break;
315 case 1:
316 TC_LOG_DEBUG("sql.driver", "Warning: Transaction only holds 1 query, consider removing Transaction context in code.");
317 break;
318 default:
319 break;
320 }
321#endif // TRINITY_DEBUG
322
324 TransactionFuture result = task->GetFuture();
325 Enqueue(task);
326 return TransactionCallback(std::move(result));
327}
328
329template <class T>
331{
332 T* connection = GetFreeConnection();
333 int errorCode = connection->ExecuteTransaction(transaction);
334 if (!errorCode)
335 {
336 connection->Unlock(); // OK, operation succesful
337 return;
338 }
339
342 if (errorCode == ER_LOCK_DEADLOCK)
343 {
344 //todo: handle multiple sync threads deadlocking in a similar way as async threads
345 uint8 loopBreaker = 5;
346 for (uint8 i = 0; i < loopBreaker; ++i)
347 {
348 if (!connection->ExecuteTransaction(transaction))
349 break;
350 }
351 }
352
354 transaction->Cleanup();
355
356 connection->Unlock();
357}
358
359template <class T>
361{
362 return new PreparedStatement<T>(index, _preparedStatementSize[index]);
363}
364
365template <class T>
367{
368 if (str.empty())
369 return;
370
371 char* buf = new char[str.size() * 2 + 1];
372 EscapeString(buf, str.c_str(), uint32(str.size()));
373 str = buf;
374 delete[] buf;
375}
376
377template <class T>
379{
381 for (auto& connection : _connections[IDX_SYNCH])
382 {
383 if (connection->LockIfReady())
384 {
385 connection->Ping();
386 connection->Unlock();
387 }
388 }
389
393 auto const count = _connections[IDX_ASYNC].size();
394 for (uint8 i = 0; i < count; ++i)
395 Enqueue(new PingOperation);
396}
397
398template <class T>
400{
401 for (uint8 i = 0; i < numConnections; ++i)
402 {
403 // Create the connection
404 auto connection = [&] {
405 switch (type)
406 {
407 case IDX_ASYNC:
408 return std::make_unique<T>(_queue.get(), *_connectionInfo);
409 case IDX_SYNCH:
410 return std::make_unique<T>(*_connectionInfo);
411 default:
412 ABORT();
413 }
414 }();
415
416 if (uint32 error = connection->Open())
417 {
418 // Failed to open a connection or invalid version, abort and cleanup
419 _connections[type].clear();
420 return error;
421 }
422 else if (uint32 serverVersion = connection->GetServerVersion(); serverVersion < ParseVersionString(TRINITY_REQUIRED_MYSQL_VERSION))
423 {
424 TC_LOG_ERROR("sql.driver", "TrinityCore does not support " TRINITY_MYSQL_FLAVOR " versions below " TRINITY_REQUIRED_MYSQL_VERSION " (found id {}, need id >= {}), please update your " TRINITY_MYSQL_FLAVOR " server", serverVersion, ParseVersionString(TRINITY_REQUIRED_MYSQL_VERSION));
425 return 1;
426 }
427 else
428 {
429 _connections[type].push_back(std::move(connection));
430 }
431 }
432
433 // Everything is fine
434 return 0;
435}
436
437template <class T>
438unsigned long DatabaseWorkerPool<T>::EscapeString(char* to, char const* from, unsigned long length)
439{
440 if (!to || !from || !length)
441 return 0;
442
443 return _connections[IDX_SYNCH].front()->EscapeString(to, from, length);
444}
445
446template <class T>
448{
449 _queue->Push(op);
450}
451
452template <class T>
454{
455 return _queue->Size();
456}
457
458template <class T>
460{
461#ifdef TRINITY_DEBUG
462 if (_warnSyncQueries)
463 {
464 std::ostringstream ss;
465 ss << boost::stacktrace::stacktrace();
466 TC_LOG_WARN("sql.performances", "Sync query at:\n{}", ss.str());
467 }
468#endif
469
470 uint8 i = 0;
471 auto const num_cons = _connections[IDX_SYNCH].size();
472 T* connection = nullptr;
474 for (;;)
475 {
476 connection = _connections[IDX_SYNCH][++i % num_cons].get();
478 if (connection->LockIfReady())
479 break;
480 }
481
482 return connection;
483}
484
485template <class T>
487{
488 return _connectionInfo->database.c_str();
489}
490
491template <class T>
493{
495 return;
496
497 BasicStatementTask* task = new BasicStatementTask(sql);
498 Enqueue(task);
499}
500
501template <class T>
503{
505 Enqueue(task);
506}
507
508template <class T>
510{
512 return;
513
514 T* connection = GetFreeConnection();
515 connection->Execute(sql);
516 connection->Unlock();
517}
518
519template <class T>
521{
522 T* connection = GetFreeConnection();
523 connection->Execute(stmt);
524 connection->Unlock();
525
527 delete stmt;
528}
529
530template <class T>
532{
533 if (!trans)
534 Execute(sql);
535 else
536 trans->Append(sql);
537}
538
539template <class T>
541{
542 if (!trans)
543 Execute(stmt);
544 else
545 trans->Append(stmt);
546}
547
std::future< PreparedQueryResult > PreparedQueryResultFuture
std::future< QueryResult > QueryResultFuture
std::shared_ptr< ResultSet > QueryResult
std::shared_ptr< Transaction< T > > SQLTransaction
std::shared_ptr< PreparedResultSet > PreparedQueryResult
std::future< bool > TransactionFuture
std::future< void > QueryResultHolderFuture
static consteval uint32 ParseVersionString(std::string_view chars)
#define TRINITY_COMPILED_CLIENT_VERSION
uint8_t uint8
Definition Define.h:135
#define TC_DATABASE_API
Definition Define.h:102
uint32_t uint32
Definition Define.h:133
#define WPFatal(cond,...)
Definition Errors.h:58
#define ABORT
Definition Errors.h:74
#define ASSERT
Definition Errors.h:68
#define TC_LOG_WARN(filterType__,...)
Definition Log.h:162
#define TC_LOG_DEBUG(filterType__,...)
Definition Log.h:156
#define TC_LOG_ERROR(filterType__,...)
Definition Log.h:165
#define TC_LOG_INFO(filterType__,...)
Definition Log.h:159
QueryResultFuture GetFuture() const
uint32 OpenConnections(InternalIndex type, uint8 numConnections)
void CommitTransaction(SQLTransaction< T > transaction)
void Enqueue(SQLOperation *op)
QueryResult Query(char const *sql, T *connection=nullptr)
bool PrepareStatements()
Prepares all prepared statements.
void SetConnectionInfo(std::string const &infoString, uint8 const asyncThreads, uint8 const synchThreads)
void ExecuteOrAppend(SQLTransaction< T > &trans, char const *sql)
SQLTransaction< T > BeginTransaction()
Begins an automanaged transaction pointer that will automatically rollback if not commited....
void DirectCommitTransaction(SQLTransaction< T > &transaction)
void KeepAlive()
Keeps all our MySQL connections alive, prevent the server from disconnecting us.
T::Statements PreparedStatementIndex
char const * GetDatabaseName() const
PreparedStatement< T > * GetPreparedStatement(PreparedStatementIndex index)
void DirectExecute(char const *sql)
SQLQueryHolderCallback DelayQueryHolder(std::shared_ptr< SQLQueryHolder< T > > holder)
void EscapeString(std::string &str)
Apply escape string'ing for current collation. (utf8)
void Execute(char const *sql)
QueryCallback AsyncQuery(char const *sql)
TransactionCallback AsyncCommitTransaction(SQLTransaction< T > transaction)
bool Execute() override
Operation for idle delaythreads.
uint64 GetRowCount() const
Definition QueryResult.h:62
PreparedQueryResultFuture GetFuture()
uint64 GetRowCount() const
Definition QueryResult.h:32
bool NextRow()
MySQLConnection * m_conn
QueryResultHolderFuture GetFuture()
Definition QueryHolder.h:63
TransactionFuture GetFuture()
Definition Transaction.h:96
bool IsFormatEmptyOrNull(char const *fmt)
Returns true if the given char pointer is null.