TrinityCore
Loading...
Searching...
No Matches
Socket.h
Go to the documentation of this file.
1/*
2 * This file is part of the TrinityCore Project. See AUTHORS file for Copyright information
3 *
4 * This program is free software; you can redistribute it and/or modify it
5 * under the terms of the GNU General Public License as published by the
6 * Free Software Foundation; either version 2 of the License, or (at your
7 * option) any later version.
8 *
9 * This program is distributed in the hope that it will be useful, but WITHOUT
10 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
11 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
12 * more details.
13 *
14 * You should have received a copy of the GNU General Public License along
15 * with this program. If not, see <http://www.gnu.org/licenses/>.
16 */
17
18#ifndef TRINITYCORE_SOCKET_H
19#define TRINITYCORE_SOCKET_H
20
21#include "Concepts.h"
22#include "Log.h"
23#include "MessageBuffer.h"
25#include <boost/asio/io_context.hpp>
26#include <boost/asio/ip/tcp.hpp>
27#include <atomic>
28#include <memory>
29#include <queue>
30#include <type_traits>
31
32using boost::asio::ip::tcp;
33
34#define READ_BLOCK_SIZE 4096
35#ifdef BOOST_ASIO_HAS_IOCP
36#define TC_SOCKET_USE_IOCP
37#endif
38
39namespace Trinity::Net
40{
41using IoContextTcpSocket = boost::asio::basic_stream_socket<boost::asio::ip::tcp, boost::asio::io_context::executor_type>;
42
44{
46 Stop
47};
48
49template <typename Callable>
51
52template <typename SocketType>
54{
56 {
57 return this->Socket->ReadHandler();
58 }
59
60 SocketType* Socket;
61};
62
63template <typename SocketType>
65{
66 explicit ReadConnectionInitializer(SocketType* socket) : ReadCallback({ .Socket = socket }) { }
67
68 void Start() override
69 {
70 ReadCallback.Socket->AsyncRead(std::move(ReadCallback));
71
72 if (this->next)
73 this->next->Start();
74 }
75
77};
78
108template<class Stream = IoContextTcpSocket>
109class Socket : public std::enable_shared_from_this<Socket<Stream>>
110{
111public:
112 template<typename... Args>
113 explicit Socket(IoContextTcpSocket&& socket, Args&&... args) : _socket(std::move(socket), std::forward<Args>(args)...),
114 _remoteAddress(_socket.remote_endpoint().address()), _remotePort(_socket.remote_endpoint().port()), _openState(OpenState_Open)
115 {
116 }
117
118 template<typename... Args>
119 explicit Socket(boost::asio::io_context& context, Args&&... args) : _socket(context, std::forward<Args>(args)...), _openState(OpenState_Closed)
120 {
121 }
122
123 Socket(Socket const& other) = delete;
124 Socket(Socket&& other) = delete;
125 Socket& operator=(Socket const& other) = delete;
126 Socket& operator=(Socket&& other) = delete;
127
128 virtual ~Socket()
129 {
131 boost::system::error_code error;
132 _socket.close(error);
133 }
134
135 virtual void Start() { }
136
137 virtual bool Update()
138 {
140 return false;
141
142#ifndef TC_SOCKET_USE_IOCP
144 return true;
145
146 for (; HandleQueue();)
147 ;
148#endif
149
150 return true;
151 }
152
153 boost::asio::ip::address const& GetRemoteIpAddress() const
154 {
155 return _remoteAddress;
156 }
157
159 {
160 return _remotePort;
161 }
162
163 template <SocketReadCallback Callback>
164 void AsyncRead(Callback&& callback)
165 {
166 if (!IsOpen())
167 return;
168
171 _socket.async_read_some(boost::asio::buffer(_readBuffer.GetWritePointer(), _readBuffer.GetRemainingSpace()),
172 [self = this->shared_from_this(), callback = std::forward<Callback>(callback)](boost::system::error_code const& error, size_t transferredBytes) mutable
173 {
174 if (self->ReadHandlerInternal(error, transferredBytes))
175 if (callback() == SocketReadCallbackResult::KeepReading)
176 self->AsyncRead(std::forward<Callback>(callback));
177 });
178 }
179
181 {
182 _writeQueue.push(std::move(buffer));
183
184#ifdef TC_SOCKET_USE_IOCP
186#endif
187 }
188
189 bool IsOpen() const { return _openState == OpenState_Open; }
190
192 {
193 if ((_openState.fetch_or(OpenState_Closed) & OpenState_Closed) == 0)
194 return;
195
196 boost::system::error_code shutdownError;
197 _socket.shutdown(boost::asio::socket_base::shutdown_send, shutdownError);
198 if (shutdownError)
199 TC_LOG_DEBUG("network", "Socket::CloseSocket: {} errored when shutting down socket: {} ({})", GetRemoteIpAddress().to_string(),
200 shutdownError.value(), shutdownError.message());
201
202 this->OnClose();
203 }
204
207 {
208 if (_openState.fetch_or(OpenState_Closing) != 0)
209 return;
210
211 if (_writeQueue.empty())
212 CloseSocket();
213 }
214
216
218 {
219 return _socket;
220 }
221
222protected:
223 virtual void OnClose() { }
224
226
228 {
229 if (_isWritingAsync)
230 return false;
231
232 _isWritingAsync = true;
233
234#ifdef TC_SOCKET_USE_IOCP
235 MessageBuffer& buffer = _writeQueue.front();
236 _socket.async_write_some(boost::asio::buffer(buffer.GetReadPointer(), buffer.GetActiveSize()),
237 [self = this->shared_from_this()](boost::system::error_code const& error, std::size_t transferedBytes)
238 {
239 self->WriteHandler(error, transferedBytes);
240 });
241#else
242 _socket.async_wait(boost::asio::socket_base::wait_type::wait_write,
243 [self = this->shared_from_this()](boost::system::error_code const& error)
244 {
245 self->WriteHandlerWrapper(error);
246 });
247#endif
248
249 return false;
250 }
251
252 void SetNoDelay(bool enable)
253 {
254 boost::system::error_code err;
255 _socket.set_option(tcp::no_delay(enable), err);
256 if (err)
257 TC_LOG_DEBUG("network", "Socket::SetNoDelay: failed to set_option(boost::asio::ip::tcp::no_delay) for {} - {} ({})",
258 GetRemoteIpAddress().to_string(), err.value(), err.message());
259 }
260
261private:
262 bool ReadHandlerInternal(boost::system::error_code const& error, size_t transferredBytes)
263 {
264 if (error)
265 {
266 CloseSocket();
267 return false;
268 }
269
270 _readBuffer.WriteCompleted(transferredBytes);
271 return IsOpen();
272 }
273
274#ifdef TC_SOCKET_USE_IOCP
275
276 void WriteHandler(boost::system::error_code error, std::size_t transferedBytes)
277 {
278 if (!error)
279 {
280 _isWritingAsync = false;
281 _writeQueue.front().ReadCompleted(transferedBytes);
282 if (!_writeQueue.front().GetActiveSize())
283 _writeQueue.pop();
284
285 if (!_writeQueue.empty())
287 else if (_openState == OpenState_Closing)
288 CloseSocket();
289 }
290 else
291 CloseSocket();
292 }
293
294#else
295
296 void WriteHandlerWrapper(boost::system::error_code const& /*error*/)
297 {
298 _isWritingAsync = false;
299 HandleQueue();
300 }
301
303 {
304 if (_writeQueue.empty())
305 return false;
306
307 MessageBuffer& queuedMessage = _writeQueue.front();
308
309 std::size_t bytesToSend = queuedMessage.GetActiveSize();
310
311 boost::system::error_code error;
312 std::size_t bytesSent = _socket.write_some(boost::asio::buffer(queuedMessage.GetReadPointer(), bytesToSend), error);
313
314 if (error)
315 {
316 if (error == boost::asio::error::would_block || error == boost::asio::error::try_again)
317 return AsyncProcessQueue();
318
319 _writeQueue.pop();
320 if (_openState == OpenState_Closing && _writeQueue.empty())
321 CloseSocket();
322 return false;
323 }
324 else if (bytesSent == 0)
325 {
326 _writeQueue.pop();
327 if (_openState == OpenState_Closing && _writeQueue.empty())
328 CloseSocket();
329 return false;
330 }
331 else if (bytesSent < bytesToSend) // now n > 0
332 {
333 queuedMessage.ReadCompleted(bytesSent);
334 return AsyncProcessQueue();
335 }
336
337 _writeQueue.pop();
338 if (_openState == OpenState_Closing && _writeQueue.empty())
339 CloseSocket();
340 return !_writeQueue.empty();
341 }
342
343#endif
344
345 Stream _socket;
346
347 boost::asio::ip::address _remoteAddress;
349
351 std::queue<MessageBuffer> _writeQueue;
352
353 // Socket open state "enum" (not enum to enable integral std::atomic api)
354 static constexpr uint8 OpenState_Open = 0x0;
355 static constexpr uint8 OpenState_Closing = 0x1;
356 static constexpr uint8 OpenState_Closed = 0x2;
357
358 std::atomic<uint8> _openState;
359
360 bool _isWritingAsync = false;
361};
362}
363
364#endif // TRINITYCORE_SOCKET_H
uint8_t uint8
Definition Define.h:135
uint16_t uint16
Definition Define.h:134
#define TC_LOG_DEBUG(filterType__,...)
Definition Log.h:156
#define READ_BLOCK_SIZE
Definition Socket.h:34
size_type GetRemainingSpace() const
void ReadCompleted(size_type bytes)
void WriteCompleted(size_type bytes)
uint8 * GetReadPointer()
size_type GetActiveSize() const
uint8 * GetWritePointer()
void EnsureFreeSpace()
uint16 GetRemotePort() const
Definition Socket.h:158
static constexpr uint8 OpenState_Closed
Definition Socket.h:356
std::atomic< uint8 > _openState
Definition Socket.h:358
Socket(Socket const &other)=delete
bool ReadHandlerInternal(boost::system::error_code const &error, size_t transferredBytes)
Definition Socket.h:262
void SetNoDelay(bool enable)
Definition Socket.h:252
Socket(IoContextTcpSocket &&socket, Args &&... args)
Definition Socket.h:113
std::queue< MessageBuffer > _writeQueue
Definition Socket.h:351
virtual SocketReadCallbackResult ReadHandler()
Definition Socket.h:225
Socket & operator=(Socket const &other)=delete
bool AsyncProcessQueue()
Definition Socket.h:227
bool IsOpen() const
Definition Socket.h:189
Stream & underlying_stream()
Definition Socket.h:217
boost::asio::ip::address const & GetRemoteIpAddress() const
Definition Socket.h:153
static constexpr uint8 OpenState_Closing
Transition to Closed state after sending all queued data.
Definition Socket.h:355
static constexpr uint8 OpenState_Open
Definition Socket.h:354
virtual void OnClose()
Definition Socket.h:223
virtual bool Update()
Definition Socket.h:137
boost::asio::ip::address _remoteAddress
Definition Socket.h:347
Socket(boost::asio::io_context &context, Args &&... args)
Definition Socket.h:119
void AsyncRead(Callback &&callback)
Definition Socket.h:164
void QueuePacket(MessageBuffer &&buffer)
Definition Socket.h:180
void DelayedCloseSocket()
Marks the socket for closing after write buffer becomes empty.
Definition Socket.h:206
MessageBuffer _readBuffer
Definition Socket.h:350
void WriteHandlerWrapper(boost::system::error_code const &)
Definition Socket.h:296
virtual void Start()
Definition Socket.h:135
Socket(Socket &&other)=delete
Socket & operator=(Socket &&other)=delete
virtual ~Socket()
Definition Socket.h:128
MessageBuffer & GetReadBuffer()
Definition Socket.h:215
SocketReadCallbackResult
Definition Socket.h:44
boost::asio::basic_stream_socket< boost::asio::ip::tcp, boost::asio::io_context::executor_type > IoContextTcpSocket
Definition Socket.h:41
STL namespace.
SocketReadCallbackResult operator()() const
Definition Socket.h:55
InvokeReadHandlerCallback< SocketType > ReadCallback
Definition Socket.h:76
ReadConnectionInitializer(SocketType *socket)
Definition Socket.h:66
std::shared_ptr< SocketConnectionInitializer > next